2
Convert between frames and higher-level AMQP methods
5
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
7
# This library is free software; you can redistribute it and/or
8
# modify it under the terms of the GNU Lesser General Public
9
# License as published by the Free Software Foundation; either
10
# version 2.1 of the License, or (at your option) any later version.
12
# This library is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
# Lesser General Public License for more details.
17
# You should have received a copy of the GNU Lesser General Public
18
# License along with this library; if not, write to the Free Software
19
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
21
from Queue import Empty, Queue
22
from struct import pack, unpack
25
from collections import defaultdict
27
class defaultdict(dict):
29
Mini-implementation of collections.defaultdict that
30
appears in Python 2.5 and up.
33
def __init__(self, default_factory):
35
self.default_factory = default_factory
37
def __getitem__(self, key):
39
return dict.__getitem__(self, key)
41
result = self.default_factory()
42
dict.__setitem__(self, key, result)
46
from basic_message import Message
47
from exceptions import *
48
from serialization import AMQPReader
55
# MethodReader needs to know which methods are supposed
56
# to be followed by content headers and bodies.
59
(60, 50), # Basic.return
60
(60, 60), # Basic.deliver
61
(60, 71), # Basic.get_ok
65
class _PartialMessage(object):
67
Helper class to build up a multi-frame method.
70
def __init__(self, method_sig, args):
71
self.method_sig = method_sig
75
self.body_received = 0
80
def add_header(self, payload):
81
class_id, weight, self.body_size = unpack('>HHQ', payload[:12])
82
self.msg._load_properties(payload[12:])
83
self.complete = (self.body_size == 0)
86
def add_payload(self, payload):
87
self.body_parts.append(payload)
88
self.body_received += len(payload)
90
if self.body_received == self.body_size:
91
self.msg.body = ''.join(self.body_parts)
95
class MethodReader(object):
97
Helper class to receive frames from the broker, combine them if
98
necessary with content-headers and content-bodies into complete methods.
100
Normally a method is represented as a tuple containing
101
(channel, method_sig, args, content).
103
In the case of a framing error, an AMQPConnectionException is placed
106
In the case of unexpected frames, a tuple made up of
107
(channel, AMQPChannelException) is placed in the queue.
110
def __init__(self, source):
114
self.partial_messages = {}
115
# For each channel, which type is expected next
116
self.expected_types = defaultdict(lambda:1)
119
def _next_method(self):
121
Read the next method from the source, once one complete method has
122
been assembled it is placed in the internal queue.
125
while self.queue.empty():
127
frame_type, channel, payload = self.source.read_frame()
130
# Connection was closed? Framing Error?
135
if self.expected_types[channel] != frame_type:
138
Exception('Received frame type %s while expecting type: %s' %
139
(frame_type, self.expected_types[channel])
142
elif frame_type == 1:
143
self._process_method_frame(channel, payload)
144
elif frame_type == 2:
145
self._process_content_header(channel, payload)
146
elif frame_type == 3:
147
self._process_content_body(channel, payload)
150
def _process_method_frame(self, channel, payload):
152
Process Method frames
155
method_sig = unpack('>HH', payload[:4])
156
args = AMQPReader(payload[4:])
158
if method_sig in _CONTENT_METHODS:
160
# Save what we've got so far and wait for the content-header
162
self.partial_messages[channel] = _PartialMessage(method_sig, args)
163
self.expected_types[channel] = 2
165
self.queue.put((channel, method_sig, args, None))
168
def _process_content_header(self, channel, payload):
170
Process Content Header frames
173
partial = self.partial_messages[channel]
174
partial.add_header(payload)
178
# a bodyless message, we're done
180
self.queue.put((channel, partial.method_sig, partial.args, partial.msg))
181
del self.partial_messages[channel]
182
self.expected_types[channel] = 1
185
# wait for the content-body
187
self.expected_types[channel] = 3
190
def _process_content_body(self, channel, payload):
192
Process Content Body frames
195
partial = self.partial_messages[channel]
196
partial.add_payload(payload)
199
# Stick the message in the queue and go back to
200
# waiting for method frames
202
self.queue.put((channel, partial.method_sig, partial.args, partial.msg))
203
del self.partial_messages[channel]
204
self.expected_types[channel] = 1
207
def read_method(self):
209
Read a method from the peer.
214
if isinstance(m, Exception):
219
class MethodWriter(object):
221
Convert AMQP methods into AMQP frames and send them out
225
def __init__(self, dest, frame_max):
227
self.frame_max = frame_max
230
def write_method(self, channel, method_sig, args, content=None):
231
payload = pack('>HH', method_sig[0], method_sig[1]) + args
233
self.dest.write_frame(1, channel, payload)
237
payload = pack('>HHQ', method_sig[0], 0, len(body)) + \
238
content._serialize_properties()
240
self.dest.write_frame(2, channel, payload)
243
payload, body = body[:self.frame_max - 8], body[self.frame_max -8:]
244
self.dest.write_frame(3, channel, payload)