2
Read/Write AMQP frames over network transports.
4
2009-01-14 Barry Pederson <bp@barryp.org>
7
# Copyright (C) 2009 Barry Pederson <bp@barryp.org>
9
# This library is free software; you can redistribute it and/or
10
# modify it under the terms of the GNU Lesser General Public
11
# License as published by the Free Software Foundation; either
12
# version 2.1 of the License, or (at your option) any later version.
14
# This library is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17
# Lesser General Public License for more details.
19
# You should have received a copy of the GNU Lesser General Public
20
# License along with this library; if not, write to the Free Software
21
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
26
# See if Python 2.6+ SSL support is available
34
from struct import pack, unpack
38
# Yes, Advanced Message Queuing Protocol Protocol is redundant
39
AMQP_PROTOCOL_HEADER = 'AMQP\x01\x01\x09\x01'
42
class _AbstractTransport(object):
44
Common superclass for TCP and SSL transports
47
def __init__(self, host, connect_timeout):
49
host, port = host.split(':', 1)
54
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
55
self.sock.settimeout(connect_timeout)
58
self.sock.connect((host, port))
62
self.sock.settimeout(None)
64
self._setup_transport()
66
self._write(AMQP_PROTOCOL_HEADER)
75
Read exactly n bytes from the peer
78
raise NotImplementedError('Must be overriden in subclass')
81
def _setup_transport(self):
83
Do any additional initialization of the class (used
92
Completely write a string to the peer.
95
raise NotImplementedError('Must be overriden in subclass')
99
if self.sock is not None:
104
def read_frame(self):
109
frame_type, channel, size = unpack('>BHI', self._read(7))
110
payload = self._read(size)
113
return frame_type, channel, payload
115
raise Exception('Framing Error, received 0x%02x while expecting 0xce' % ord(ch))
118
def write_frame(self, frame_type, channel, payload):
120
Write out an AMQP frame.
124
self._write(pack('>BHI%dsB' % size,
125
frame_type, channel, size, payload, 0xce))
128
class SSLTransport(_AbstractTransport):
130
Transport that works over SSL
133
def _setup_transport(self):
135
Wrap the socket in an SSL object, either the
136
new Python 2.6 version, or the older Python 2.5 and
141
self.sslobj = ssl.wrap_socket(self.sock)
142
self.sslobj.do_handshake()
144
self.sslobj = socket.ssl(self.sock)
149
It seems that SSL Objects read() method may not supply as much
150
as you're asking for, at least with extremely large messages.
151
somewhere > 16K - found this in the test_channel.py test_large
155
result = self.sslobj.read(n)
157
while len(result) < n:
158
s = self.sslobj.read(n - len(result))
160
raise IOError('Socket closed')
168
Write a string out to the SSL socket fully.
172
n = self.sslobj.write(s)
174
raise IOError('Socket closed')
179
class TCPTransport(_AbstractTransport):
181
Transport that deals directly with TCP socket.
184
def _setup_transport(self):
186
Setup to _write() directly to the socket, and
187
do our own buffered reads.
190
self._write = self.sock.sendall
191
self._read_buffer = ''
196
Read exactly n bytes from the socket
199
while len(self._read_buffer) < n:
200
s = self.sock.recv(65536)
202
raise IOError('Socket closed')
203
self._read_buffer += s
205
result = self._read_buffer[:n]
206
self._read_buffer = self._read_buffer[n:]
211
def create_transport(host, connect_timeout, ssl=False):
213
Given a few parameters from the Connection constructor,
214
select and create a subclass of _AbstractTransport.
218
return SSLTransport(host, connect_timeout)
220
return TCPTransport(host, connect_timeout)