~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/amqplib/client_0_8/transport.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
Read/Write AMQP frames over network transports.
 
3
 
 
4
2009-01-14 Barry Pederson <bp@barryp.org>
 
5
 
 
6
"""
 
7
# Copyright (C) 2009 Barry Pederson <bp@barryp.org>
 
8
#
 
9
# This library is free software; you can redistribute it and/or
 
10
# modify it under the terms of the GNU Lesser General Public
 
11
# License as published by the Free Software Foundation; either
 
12
# version 2.1 of the License, or (at your option) any later version.
 
13
#
 
14
# This library is distributed in the hope that it will be useful,
 
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
17
# Lesser General Public License for more details.
 
18
#
 
19
# You should have received a copy of the GNU Lesser General Public
 
20
# License along with this library; if not, write to the Free Software
 
21
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
 
22
 
 
23
import socket
 
24
 
 
25
#
 
26
# See if Python 2.6+ SSL support is available
 
27
#
 
28
try:
 
29
    import ssl
 
30
    HAVE_PY26_SSL = True
 
31
except:
 
32
    HAVE_PY26_SSL = False
 
33
 
 
34
from struct import pack, unpack
 
35
 
 
36
AMQP_PORT = 5672
 
37
 
 
38
# Yes, Advanced Message Queuing Protocol Protocol is redundant
 
39
AMQP_PROTOCOL_HEADER = 'AMQP\x01\x01\x09\x01'
 
40
 
 
41
 
 
42
class _AbstractTransport(object):
 
43
    """
 
44
    Common superclass for TCP and SSL transports
 
45
 
 
46
    """
 
47
    def __init__(self, host, connect_timeout):
 
48
        if ':' in host:
 
49
            host, port = host.split(':', 1)
 
50
            port = int(port)
 
51
        else:
 
52
            port = AMQP_PORT
 
53
 
 
54
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
55
        self.sock.settimeout(connect_timeout)
 
56
 
 
57
        try:
 
58
            self.sock.connect((host, port))
 
59
        except socket.error:
 
60
            self.sock.close()
 
61
            raise
 
62
        self.sock.settimeout(None)
 
63
 
 
64
        self._setup_transport()
 
65
 
 
66
        self._write(AMQP_PROTOCOL_HEADER)
 
67
 
 
68
 
 
69
    def __del__(self):
 
70
        self.close()
 
71
 
 
72
 
 
73
    def _read(self, n):
 
74
        """
 
75
        Read exactly n bytes from the peer
 
76
 
 
77
        """
 
78
        raise NotImplementedError('Must be overriden in subclass')
 
79
 
 
80
 
 
81
    def _setup_transport(self):
 
82
        """
 
83
        Do any additional initialization of the class (used
 
84
        by the subclasses).
 
85
 
 
86
        """
 
87
        pass
 
88
 
 
89
 
 
90
    def _write(self, s):
 
91
        """
 
92
        Completely write a string to the peer.
 
93
 
 
94
        """
 
95
        raise NotImplementedError('Must be overriden in subclass')
 
96
 
 
97
 
 
98
    def close(self):
 
99
        if self.sock is not None:
 
100
            self.sock.close()
 
101
            self.sock = None
 
102
 
 
103
 
 
104
    def read_frame(self):
 
105
        """
 
106
        Read an AMQP frame.
 
107
 
 
108
        """
 
109
        frame_type, channel, size = unpack('>BHI', self._read(7))
 
110
        payload = self._read(size)
 
111
        ch = self._read(1)
 
112
        if ch == '\xce':
 
113
            return frame_type, channel, payload
 
114
        else:
 
115
            raise Exception('Framing Error, received 0x%02x while expecting 0xce' % ord(ch))
 
116
 
 
117
 
 
118
    def write_frame(self, frame_type, channel, payload):
 
119
        """
 
120
        Write out an AMQP frame.
 
121
 
 
122
        """
 
123
        size = len(payload)
 
124
        self._write(pack('>BHI%dsB' % size,
 
125
            frame_type, channel, size, payload, 0xce))
 
126
 
 
127
 
 
128
class SSLTransport(_AbstractTransport):
 
129
    """
 
130
    Transport that works over SSL
 
131
 
 
132
    """
 
133
    def _setup_transport(self):
 
134
        """
 
135
        Wrap the socket in an SSL object, either the
 
136
        new Python 2.6 version, or the older Python 2.5 and
 
137
        lower version.
 
138
 
 
139
        """
 
140
        if HAVE_PY26_SSL:
 
141
            self.sslobj = ssl.wrap_socket(self.sock)
 
142
            self.sslobj.do_handshake()
 
143
        else:
 
144
            self.sslobj = socket.ssl(self.sock)
 
145
 
 
146
 
 
147
    def _read(self, n):
 
148
        """
 
149
        It seems that SSL Objects read() method may not supply as much
 
150
        as you're asking for, at least with extremely large messages.
 
151
        somewhere > 16K - found this in the test_channel.py test_large
 
152
        unittest.
 
153
 
 
154
        """
 
155
        result = self.sslobj.read(n)
 
156
 
 
157
        while len(result) < n:
 
158
            s = self.sslobj.read(n - len(result))
 
159
            if not s:
 
160
                raise IOError('Socket closed')
 
161
            result += s
 
162
 
 
163
        return result
 
164
 
 
165
 
 
166
    def _write(self, s):
 
167
        """
 
168
        Write a string out to the SSL socket fully.
 
169
 
 
170
        """
 
171
        while s:
 
172
            n = self.sslobj.write(s)
 
173
            if not n:
 
174
                raise IOError('Socket closed')
 
175
            s = s[n:]
 
176
 
 
177
 
 
178
 
 
179
class TCPTransport(_AbstractTransport):
 
180
    """
 
181
    Transport that deals directly with TCP socket.
 
182
 
 
183
    """
 
184
    def _setup_transport(self):
 
185
        """
 
186
        Setup to _write() directly to the socket, and
 
187
        do our own buffered reads.
 
188
 
 
189
        """
 
190
        self._write = self.sock.sendall
 
191
        self._read_buffer = ''
 
192
 
 
193
 
 
194
    def _read(self, n):
 
195
        """
 
196
        Read exactly n bytes from the socket
 
197
 
 
198
        """
 
199
        while len(self._read_buffer) < n:
 
200
            s = self.sock.recv(65536)
 
201
            if not s:
 
202
                raise IOError('Socket closed')
 
203
            self._read_buffer += s
 
204
 
 
205
        result = self._read_buffer[:n]
 
206
        self._read_buffer = self._read_buffer[n:]
 
207
 
 
208
        return result
 
209
 
 
210
 
 
211
def create_transport(host, connect_timeout, ssl=False):
 
212
    """
 
213
    Given a few parameters from the Connection constructor,
 
214
    select and create a subclass of _AbstractTransport.
 
215
 
 
216
    """
 
217
    if ssl:
 
218
        return SSLTransport(host, connect_timeout)
 
219
    else:
 
220
        return TCPTransport(host, connect_timeout)