~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/amqplib/client_0_8/method_framing.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Convert between frames and higher-level AMQP methods
 
3
 
 
4
"""
 
5
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
 
6
#
 
7
# This library is free software; you can redistribute it and/or
 
8
# modify it under the terms of the GNU Lesser General Public
 
9
# License as published by the Free Software Foundation; either
 
10
# version 2.1 of the License, or (at your option) any later version.
 
11
#
 
12
# This library is distributed in the hope that it will be useful,
 
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
15
# Lesser General Public License for more details.
 
16
#
 
17
# You should have received a copy of the GNU Lesser General Public
 
18
# License along with this library; if not, write to the Free Software
 
19
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
 
20
 
 
21
from Queue import Empty, Queue
 
22
from struct import pack, unpack
 
23
 
 
24
try:
 
25
    from collections import defaultdict
 
26
except:
 
27
    class defaultdict(dict):
 
28
        """
 
29
        Mini-implementation of collections.defaultdict that
 
30
        appears in Python 2.5 and up.
 
31
 
 
32
        """
 
33
        def __init__(self, default_factory):
 
34
            dict.__init__(self)
 
35
            self.default_factory = default_factory
 
36
 
 
37
        def __getitem__(self, key):
 
38
            try:
 
39
                return dict.__getitem__(self, key)
 
40
            except KeyError:
 
41
                result = self.default_factory()
 
42
                dict.__setitem__(self, key, result)
 
43
                return result
 
44
 
 
45
 
 
46
from basic_message import Message
 
47
from exceptions import *
 
48
from serialization import AMQPReader
 
49
 
 
50
__all__ =  [
 
51
            'MethodReader',
 
52
           ]
 
53
 
 
54
#
 
55
# MethodReader needs to know which methods are supposed
 
56
# to be followed by content headers and bodies.
 
57
#
 
58
_CONTENT_METHODS = [
 
59
    (60, 50), # Basic.return
 
60
    (60, 60), # Basic.deliver
 
61
    (60, 71), # Basic.get_ok
 
62
    ]
 
63
 
 
64
 
 
65
class _PartialMessage(object):
 
66
    """
 
67
    Helper class to build up a multi-frame method.
 
68
 
 
69
    """
 
70
    def __init__(self, method_sig, args):
 
71
        self.method_sig = method_sig
 
72
        self.args = args
 
73
        self.msg = Message()
 
74
        self.body_parts = []
 
75
        self.body_received = 0
 
76
        self.body_size = None
 
77
        self.complete = False
 
78
 
 
79
 
 
80
    def add_header(self, payload):
 
81
        class_id, weight, self.body_size = unpack('>HHQ', payload[:12])
 
82
        self.msg._load_properties(payload[12:])
 
83
        self.complete = (self.body_size == 0)
 
84
 
 
85
 
 
86
    def add_payload(self, payload):
 
87
        self.body_parts.append(payload)
 
88
        self.body_received += len(payload)
 
89
 
 
90
        if self.body_received == self.body_size:
 
91
            self.msg.body = ''.join(self.body_parts)
 
92
            self.complete = True
 
93
 
 
94
 
 
95
class MethodReader(object):
 
96
    """
 
97
    Helper class to receive frames from the broker, combine them if
 
98
    necessary with content-headers and content-bodies into complete methods.
 
99
 
 
100
    Normally a method is represented as a tuple containing
 
101
    (channel, method_sig, args, content).
 
102
 
 
103
    In the case of a framing error, an AMQPConnectionException is placed
 
104
    in the queue.
 
105
 
 
106
    In the case of unexpected frames, a tuple made up of
 
107
    (channel, AMQPChannelException) is placed in the queue.
 
108
 
 
109
    """
 
110
    def __init__(self, source):
 
111
        self.source = source
 
112
        self.queue = Queue()
 
113
        self.running = False
 
114
        self.partial_messages = {}
 
115
        # For each channel, which type is expected next
 
116
        self.expected_types = defaultdict(lambda:1)
 
117
 
 
118
 
 
119
    def _next_method(self):
 
120
        """
 
121
        Read the next method from the source, once one complete method has
 
122
        been assembled it is placed in the internal queue.
 
123
 
 
124
        """
 
125
        while self.queue.empty():
 
126
            try:
 
127
                frame_type, channel, payload = self.source.read_frame()
 
128
            except Exception, e:
 
129
                #
 
130
                # Connection was closed?  Framing Error?
 
131
                #
 
132
                self.queue.put(e)
 
133
                break
 
134
 
 
135
            if self.expected_types[channel] != frame_type:
 
136
                self.queue.put((
 
137
                    channel,
 
138
                    Exception('Received frame type %s while expecting type: %s' %
 
139
                        (frame_type, self.expected_types[channel])
 
140
                        )
 
141
                    ))
 
142
            elif frame_type == 1:
 
143
                self._process_method_frame(channel, payload)
 
144
            elif frame_type == 2:
 
145
                self._process_content_header(channel, payload)
 
146
            elif frame_type == 3:
 
147
                self._process_content_body(channel, payload)
 
148
 
 
149
 
 
150
    def _process_method_frame(self, channel, payload):
 
151
        """
 
152
        Process Method frames
 
153
 
 
154
        """
 
155
        method_sig = unpack('>HH', payload[:4])
 
156
        args = AMQPReader(payload[4:])
 
157
 
 
158
        if method_sig in _CONTENT_METHODS:
 
159
            #
 
160
            # Save what we've got so far and wait for the content-header
 
161
            #
 
162
            self.partial_messages[channel] = _PartialMessage(method_sig, args)
 
163
            self.expected_types[channel] = 2
 
164
        else:
 
165
            self.queue.put((channel, method_sig, args, None))
 
166
 
 
167
 
 
168
    def _process_content_header(self, channel, payload):
 
169
        """
 
170
        Process Content Header frames
 
171
 
 
172
        """
 
173
        partial = self.partial_messages[channel]
 
174
        partial.add_header(payload)
 
175
 
 
176
        if partial.complete:
 
177
            #
 
178
            # a bodyless message, we're done
 
179
            #
 
180
            self.queue.put((channel, partial.method_sig, partial.args, partial.msg))
 
181
            del self.partial_messages[channel]
 
182
            self.expected_types[channel] = 1
 
183
        else:
 
184
            #
 
185
            # wait for the content-body
 
186
            #
 
187
            self.expected_types[channel] = 3
 
188
 
 
189
 
 
190
    def _process_content_body(self, channel, payload):
 
191
        """
 
192
        Process Content Body frames
 
193
 
 
194
        """
 
195
        partial = self.partial_messages[channel]
 
196
        partial.add_payload(payload)
 
197
        if partial.complete:
 
198
            #
 
199
            # Stick the message in the queue and go back to
 
200
            # waiting for method frames
 
201
            #
 
202
            self.queue.put((channel, partial.method_sig, partial.args, partial.msg))
 
203
            del self.partial_messages[channel]
 
204
            self.expected_types[channel] = 1
 
205
 
 
206
 
 
207
    def read_method(self):
 
208
        """
 
209
        Read a method from the peer.
 
210
 
 
211
        """
 
212
        self._next_method()
 
213
        m = self.queue.get()
 
214
        if isinstance(m, Exception):
 
215
            raise m
 
216
        return m
 
217
 
 
218
 
 
219
class MethodWriter(object):
 
220
    """
 
221
    Convert AMQP methods into AMQP frames and send them out
 
222
    to the peer.
 
223
 
 
224
    """
 
225
    def __init__(self, dest, frame_max):
 
226
        self.dest = dest
 
227
        self.frame_max = frame_max
 
228
 
 
229
 
 
230
    def write_method(self, channel, method_sig, args, content=None):
 
231
        payload = pack('>HH', method_sig[0], method_sig[1]) + args
 
232
 
 
233
        self.dest.write_frame(1, channel, payload)
 
234
 
 
235
        if content:
 
236
            body = content.body
 
237
            payload = pack('>HHQ', method_sig[0], 0, len(body)) + \
 
238
                content._serialize_properties()
 
239
 
 
240
            self.dest.write_frame(2, channel, payload)
 
241
 
 
242
            while body:
 
243
                payload, body = body[:self.frame_max - 8], body[self.frame_max -8:]
 
244
                self.dest.write_frame(3, channel, payload)