~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/carrot/backends/pystomp.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from stompy import Client
 
2
from stompy import Empty as QueueEmpty
 
3
from carrot.backends.base import BaseMessage, BaseBackend
 
4
from itertools import count
 
5
import socket
 
6
 
 
7
DEFAULT_PORT = 61613
 
8
 
 
9
 
 
10
class Message(BaseMessage):
 
11
    """A message received by the STOMP broker.
 
12
 
 
13
    Usually you don't insantiate message objects yourself, but receive
 
14
    them using a :class:`carrot.messaging.Consumer`.
 
15
 
 
16
    :param backend: see :attr:`backend`.
 
17
    :param frame: see :attr:`_frame`.
 
18
 
 
19
    .. attribute:: body
 
20
 
 
21
        The message body.
 
22
 
 
23
    .. attribute:: delivery_tag
 
24
 
 
25
        The message delivery tag, uniquely identifying this message.
 
26
 
 
27
    .. attribute:: backend
 
28
 
 
29
        The message backend used.
 
30
        A subclass of :class:`carrot.backends.base.BaseBackend`.
 
31
 
 
32
    .. attribute:: _frame
 
33
 
 
34
        The frame received by the STOMP client. This is considered a private
 
35
        variable and should never be used in production code.
 
36
 
 
37
    """
 
38
 
 
39
    def __init__(self, backend, frame, **kwargs):
 
40
        self._frame = frame
 
41
        self.backend = backend
 
42
 
 
43
        kwargs["body"] = frame.body
 
44
        kwargs["delivery_tag"] = frame.headers["message-id"]
 
45
        kwargs["content_type"] = frame.headers.get("content-type")
 
46
        kwargs["content_encoding"] = frame.headers.get("content-encoding")
 
47
        kwargs["priority"] = frame.headers.get("priority")
 
48
 
 
49
        super(Message, self).__init__(backend, **kwargs)
 
50
 
 
51
    def ack(self):
 
52
        """Acknowledge this message as being processed.,
 
53
        This will remove the message from the queue.
 
54
 
 
55
        :raises MessageStateError: If the message has already been
 
56
            acknowledged/requeued/rejected.
 
57
 
 
58
        """
 
59
        if self.acknowledged:
 
60
            raise self.MessageStateError(
 
61
                "Message already acknowledged with state: %s" % self._state)
 
62
        self.backend.ack(self._frame)
 
63
        self._state = "ACK"
 
64
 
 
65
    def reject(self):
 
66
        raise NotImplementedError(
 
67
            "The STOMP backend does not implement basic.reject")
 
68
 
 
69
    def requeue(self):
 
70
        raise NotImplementedError(
 
71
            "The STOMP backend does not implement requeue")
 
72
 
 
73
 
 
74
class Backend(BaseBackend):
 
75
    Stomp = Client
 
76
    Message = Message
 
77
    default_port = DEFAULT_PORT
 
78
 
 
79
    def __init__(self, connection, **kwargs):
 
80
        self.connection = connection
 
81
        self.default_port = kwargs.get("default_port", self.default_port)
 
82
        self._channel = None
 
83
        self._consumers = {} # open consumers by consumer tag
 
84
        self._callbacks = {}
 
85
 
 
86
    def establish_connection(self):
 
87
        conninfo = self.connection
 
88
        if not conninfo.port:
 
89
            conninfo.port = self.default_port
 
90
        stomp = self.Stomp(conninfo.hostname, conninfo.port)
 
91
        stomp.connect()
 
92
        return stomp
 
93
 
 
94
    def close_connection(self, connection):
 
95
        try:
 
96
            connection.disconnect()
 
97
        except socket.error:
 
98
            pass
 
99
 
 
100
    def queue_exists(self, queue):
 
101
        return True
 
102
 
 
103
    def queue_purge(self, queue, **kwargs):
 
104
        for purge_count in count(0):
 
105
            try:
 
106
                frame = self.channel.get_nowait()
 
107
            except QueueEmpty:
 
108
                return purge_count
 
109
            else:
 
110
                self.channel.ack(frame)
 
111
 
 
112
    def declare_consumer(self, queue, no_ack, callback, consumer_tag,
 
113
            **kwargs):
 
114
        ack = no_ack and "auto" or "client"
 
115
        self.channel.subscribe(queue, ack=ack)
 
116
        self._consumers[consumer_tag] = queue
 
117
        self._callbacks[queue] = callback
 
118
 
 
119
    def consume(self, limit=None):
 
120
        """Returns an iterator that waits for one message at a time."""
 
121
        for total_message_count in count():
 
122
            if limit and total_message_count >= limit:
 
123
                raise StopIteration
 
124
            while True:
 
125
                frame = self.channel.get()
 
126
                if frame:
 
127
                    break
 
128
            queue = frame.headers.get("destination")
 
129
 
 
130
            if not queue or queue not in self._callbacks:
 
131
                continue
 
132
 
 
133
            self._callbacks[queue](frame)
 
134
 
 
135
            yield True
 
136
 
 
137
    def queue_declare(self, queue, *args, **kwargs):
 
138
        self.channel.subscribe(queue, ack="client")
 
139
 
 
140
    def get(self, queue, no_ack=False):
 
141
        try:
 
142
            frame = self.channel.get_nowait()
 
143
        except QueueEmpty:
 
144
            return None
 
145
        else:
 
146
            return self.message_to_python(frame)
 
147
 
 
148
    def ack(self, frame):
 
149
        self.channel.ack(frame)
 
150
 
 
151
    def message_to_python(self, raw_message):
 
152
        """Convert encoded message body back to a Python value."""
 
153
        return self.Message(backend=self, frame=raw_message)
 
154
 
 
155
    def prepare_message(self, message_data, delivery_mode, priority=0,
 
156
            content_type=None, content_encoding=None):
 
157
        persistent = "false"
 
158
        if delivery_mode == 2:
 
159
            persistent = "true"
 
160
        priority = priority or 0
 
161
        return {"body": message_data,
 
162
                "persistent": persistent,
 
163
                "priority": priority,
 
164
                "content-encoding": content_encoding,
 
165
                "content-type": content_type}
 
166
 
 
167
    def publish(self, message, exchange, routing_key, **kwargs):
 
168
        message["destination"] = exchange
 
169
        self.channel.stomp.send(message)
 
170
 
 
171
    def cancel(self, consumer_tag):
 
172
        if not self._channel or consumer_tag not in self._consumers:
 
173
            return
 
174
        queue = self._consumers.pop(consumer_tag)
 
175
        self.channel.unsubscribe(queue)
 
176
 
 
177
    def close(self):
 
178
        for consumer_tag in self._consumers.keys():
 
179
            self.cancel(consumer_tag)
 
180
        if self._channel:
 
181
            try:
 
182
                self._channel.disconnect()
 
183
            except socket.error:
 
184
                pass
 
185
 
 
186
    @property
 
187
    def channel(self):
 
188
        if not self._channel:
 
189
            # Sorry, but the python-stomp library needs one connection
 
190
            # for each channel.
 
191
            self._channel = self.establish_connection()
 
192
        return self._channel