5
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
7
# This library is free software; you can redistribute it and/or
8
# modify it under the terms of the GNU Lesser General Public
9
# License as published by the Free Software Foundation; either
10
# version 2.1 of the License, or (at your option) any later version.
12
# This library is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
# Lesser General Public License for more details.
17
# You should have received a copy of the GNU Lesser General Public
18
# License along with this library; if not, write to the Free Software
19
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
23
from abstract_channel import AbstractChannel
24
from channel import Channel
25
from exceptions import *
26
from method_framing import MethodReader, MethodWriter
27
from serialization import AMQPReader, AMQPWriter
28
from transport import create_transport
35
# Client property info that gets sent to the server on connection startup
37
LIBRARY_PROPERTIES = {
38
'library': 'Python amqplib',
39
'library_version': '0.6.1',
42
AMQP_LOGGER = logging.getLogger('amqplib')
45
class Connection(AbstractChannel):
47
The connection class provides methods for a client to establish a
48
network connection to a server, and for both peers to operate the
49
connection thereafter.
53
connection = open-connection *use-connection close-connection
54
open-connection = C:protocol-header
58
C:OPEN S:OPEN-OK | S:REDIRECT
59
challenge = S:SECURE C:SECURE-OK
60
use-connection = *channel
61
close-connection = C:CLOSE S:CLOSE-OK
69
login_method='AMQPLAIN',
73
client_properties=None,
79
Create a connection to the specified host, which should be
80
a 'host[:port]', such as 'localhost', or '1.2.3.4:5672'
81
(defaults to 'localhost', if a port is not specified then
84
If login_response is not specified, one is built up for you from
85
userid and password if they are present.
88
if (login_response is None) \
89
and (userid is not None) \
90
and (password is not None):
91
login_response = AMQPWriter()
92
login_response.write_table({'LOGIN': userid, 'PASSWORD': password})
93
login_response = login_response.getvalue()[4:] #Skip the length
97
d.update(LIBRARY_PROPERTIES)
99
d.update(client_properties)
101
self.known_hosts = ''
105
# The connection object itself is treated as channel 0
106
super(Connection, self).__init__(self, 0)
108
self.transport = None
110
# Properties set in the Tune method
111
self.channel_max = 65535
112
self.frame_max = 131072
115
# Properties set in the Start method
116
self.version_major = 0
117
self.version_minor = 0
118
self.server_properties = {}
122
# Let the transport.py module setup the actual
123
# socket connection to the broker.
125
self.transport = create_transport(host, connect_timeout, ssl)
127
self.method_reader = MethodReader(self.transport)
128
self.method_writer = MethodWriter(self.transport, self.frame_max)
130
self.wait(allowed_methods=[
134
self._x_start_ok(d, login_method, login_response, locale)
136
self._wait_tune_ok = True
137
while self._wait_tune_ok:
138
self.wait(allowed_methods=[
143
host = self._x_open(virtual_host, insist=insist)
145
# we weren't redirected
148
# we were redirected, close the socket, loop and try again
156
self.transport.close()
157
self.transport = None
159
temp_list = [x for x in self.channels.values() if x is not self]
163
self.connection = self.channels = None
166
def _get_free_channel_id(self):
167
for i in xrange(1, self.channel_max+1):
168
if i not in self.channels:
170
raise AMQPException('No free channel ids, current=%d, channel_max=%d'
171
% (len(self.channels), self.channel_max))
174
def _wait_method(self, channel_id, allowed_methods):
176
Wait for a method from the server destined for
177
a particular channel.
181
# Check the channel's deferred methods
183
method_queue = self.channels[channel_id].method_queue
185
for queued_method in method_queue:
186
method_sig = queued_method[0]
187
if (allowed_methods is None) \
188
or (method_sig in allowed_methods) \
189
or (method_sig == (20, 40)):
190
method_queue.remove(queued_method)
194
# Nothing queued, need to wait for a method from the peer
197
channel, method_sig, args, content = \
198
self.method_reader.read_method()
200
if (channel == channel_id) \
201
and ((allowed_methods is None) \
202
or (method_sig in allowed_methods) \
203
or (method_sig == (20, 40))):
204
return method_sig, args, content
207
# Not the channel and/or method we were looking for. Queue
208
# this method for later
210
self.channels[channel].method_queue.append((method_sig, args, content))
213
# If we just queued up a method for channel 0 (the Connection
214
# itself) it's probably a close method in reaction to some
215
# error, so deal with it right away.
221
def channel(self, channel_id=None):
223
Fetch a Channel object identified by the numeric channel_id, or
224
create that object if it doesn't already exist.
227
if channel_id in self.channels:
228
return self.channels[channel_id]
230
return Channel(self, channel_id)
235
def close(self, reply_code=0, reply_text='', method_sig=(0, 0)):
237
request a connection close
239
This method indicates that the sender wants to close the
240
connection. This may be due to internal conditions (e.g. a
241
forced shut-down) or due to an error handling a specific
242
method, i.e. an exception. When a close is due to an
243
exception, the sender provides the class and method id of the
244
method which caused the exception.
248
After sending this method any received method except the
249
Close-OK method MUST be discarded.
253
The peer sending this method MAY use a counter or timeout
254
to detect failure of the other peer to respond correctly
255
with the Close-OK method.
259
When a server receives the Close method from a client it
260
MUST delete all server-side resources associated with the
261
client's context. A client CANNOT reconnect to a context
262
after sending or receiving a Close method.
267
The reply code. The AMQ reply codes are defined in AMQ
272
The localised reply text. This text can be logged as an
273
aid to resolving issues.
279
When the close is provoked by a method exception, this
280
is the class of the method.
286
When the close is provoked by a method exception, this
287
is the ID of the method.
290
if self.transport is None:
295
args.write_short(reply_code)
296
args.write_shortstr(reply_text)
297
args.write_short(method_sig[0]) # class_id
298
args.write_short(method_sig[1]) # method_id
299
self._send_method((10, 60), args)
300
return self.wait(allowed_methods=[
301
(10, 61), # Connection.close_ok
305
def _close(self, args):
307
request a connection close
309
This method indicates that the sender wants to close the
310
connection. This may be due to internal conditions (e.g. a
311
forced shut-down) or due to an error handling a specific
312
method, i.e. an exception. When a close is due to an
313
exception, the sender provides the class and method id of the
314
method which caused the exception.
318
After sending this method any received method except the
319
Close-OK method MUST be discarded.
323
The peer sending this method MAY use a counter or timeout
324
to detect failure of the other peer to respond correctly
325
with the Close-OK method.
329
When a server receives the Close method from a client it
330
MUST delete all server-side resources associated with the
331
client's context. A client CANNOT reconnect to a context
332
after sending or receiving a Close method.
337
The reply code. The AMQ reply codes are defined in AMQ
342
The localised reply text. This text can be logged as an
343
aid to resolving issues.
349
When the close is provoked by a method exception, this
350
is the class of the method.
356
When the close is provoked by a method exception, this
357
is the ID of the method.
360
reply_code = args.read_short()
361
reply_text = args.read_shortstr()
362
class_id = args.read_short()
363
method_id = args.read_short()
367
raise AMQPConnectionException(reply_code, reply_text, (class_id, method_id))
370
def _x_close_ok(self):
372
confirm a connection close
374
This method confirms a Connection.Close method and tells the
375
recipient that it is safe to release resources for the
376
connection and close the socket.
380
A peer that detects a socket closure without having
381
received a Close-Ok handshake method SHOULD log the error.
384
self._send_method((10, 61))
388
def _close_ok(self, args):
390
confirm a connection close
392
This method confirms a Connection.Close method and tells the
393
recipient that it is safe to release resources for the
394
connection and close the socket.
398
A peer that detects a socket closure without having
399
received a Close-Ok handshake method SHOULD log the error.
405
def _x_open(self, virtual_host, capabilities='', insist=False):
407
open connection to virtual host
409
This method opens a connection to a virtual host, which is a
410
collection of resources, and acts to separate multiple
411
application domains within a server.
415
The client MUST open the context before doing any work on
419
virtual_host: shortstr
423
The name of the virtual host to work with.
427
If the server supports multiple virtual hosts, it
428
MUST enforce a full separation of exchanges,
429
queues, and all associated entities per virtual
430
host. An application, connected to a specific
431
virtual host, MUST NOT be able to access resources
432
of another virtual host.
436
The server SHOULD verify that the client has
437
permission to access the specified virtual host.
441
The server MAY configure arbitrary limits per
442
virtual host, such as the number of each type of
443
entity that may be used, per connection and/or in
446
capabilities: shortstr
448
required capabilities
450
The client may specify a number of capability names,
451
delimited by spaces. The server can use this string
452
to how to process the client's connection request.
456
insist on connecting to server
458
In a configuration with multiple load-sharing servers,
459
the server may respond to a Connection.Open method
460
with a Connection.Redirect. The insist option tells
461
the server that the client is insisting on a
462
connection to the specified server.
466
When the client uses the insist option, the server
467
SHOULD accept the client connection unless it is
468
technically unable to do so.
472
args.write_shortstr(virtual_host)
473
args.write_shortstr(capabilities)
474
args.write_bit(insist)
475
self._send_method((10, 40), args)
476
return self.wait(allowed_methods=[
477
(10, 41), # Connection.open_ok
478
(10, 50), # Connection.redirect
482
def _open_ok(self, args):
484
signal that the connection is ready
486
This method signals to the client that the connection is ready
490
known_hosts: shortstr
493
self.known_hosts = args.read_shortstr()
494
AMQP_LOGGER.debug('Open OK! known_hosts [%s]' % self.known_hosts)
498
def _redirect(self, args):
500
asks the client to use a different server
502
This method redirects the client to another server, based on
503
the requested virtual host and/or capabilities.
507
When getting the Connection.Redirect method, the client
508
SHOULD reconnect to the host specified, and if that host
509
is not present, to any of the hosts specified in the
517
Specifies the server to connect to. This is an IP
518
address or a DNS name, optionally followed by a colon
519
and a port number. If no port number is specified, the
520
client should use the default port number for the
523
known_hosts: shortstr
526
host = args.read_shortstr()
527
self.known_hosts = args.read_shortstr()
528
AMQP_LOGGER.debug('Redirected to [%s], known_hosts [%s]' % (host, self.known_hosts))
532
def _secure(self, args):
534
security mechanism challenge
536
The SASL protocol works by exchanging challenges and responses
537
until both peers have received sufficient information to
538
authenticate each other. This method challenges the client to
539
provide more information.
544
security challenge data
546
Challenge information, a block of opaque binary data
547
passed to the security mechanism.
550
challenge = args.read_longstr()
553
def _x_secure_ok(self, response):
555
security mechanism response
557
This method attempts to authenticate, passing a block of SASL
558
data for the security mechanism at the server side.
563
security response data
565
A block of opaque data passed to the security
566
mechanism. The contents of this data are defined by
567
the SASL security mechanism.
571
args.write_longstr(response)
572
self._send_method((10, 21), args)
575
def _start(self, args):
577
start connection negotiation
579
This method starts the connection negotiation process by
580
telling the client the protocol version that the server
581
proposes, along with a list of security mechanisms which the
582
client can use for authentication.
586
If the client cannot handle the protocol version suggested
587
by the server it MUST close the socket connection.
591
The server MUST provide a protocol version that is lower
592
than or equal to that requested by the client in the
593
protocol header. If the server cannot support the
594
specified protocol it MUST NOT send this method, but MUST
595
close the socket connection.
600
protocol major version
602
The protocol major version that the server agrees to
603
use, which cannot be higher than the client's major
608
protocol major version
610
The protocol minor version that the server agrees to
611
use, which cannot be higher than the client's minor
614
server_properties: table
620
available security mechanisms
622
A list of the security mechanisms that the server
623
supports, delimited by spaces. Currently ASL supports
624
these mechanisms: PLAIN.
628
available message locales
630
A list of the message locales that the server
631
supports, delimited by spaces. The locale defines the
632
language in which the server will send reply texts.
636
All servers MUST support at least the en_US
640
self.version_major = args.read_octet()
641
self.version_minor = args.read_octet()
642
self.server_properties = args.read_table()
643
self.mechanisms = args.read_longstr().split(' ')
644
self.locales = args.read_longstr().split(' ')
646
AMQP_LOGGER.debug('Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s'
647
% (self.version_major, self.version_minor,
648
str(self.server_properties), self.mechanisms, self.locales))
651
def _x_start_ok(self, client_properties, mechanism, response, locale):
653
select security mechanism and locale
655
This method selects a SASL security mechanism. ASL uses SASL
656
(RFC2222) to negotiate authentication and encryption.
659
client_properties: table
665
selected security mechanism
667
A single security mechanisms selected by the client,
668
which must be one of those specified by the server.
672
The client SHOULD authenticate using the highest-
673
level security profile it can handle from the list
674
provided by the server.
678
The mechanism field MUST contain one of the
679
security mechanisms proposed by the server in the
680
Start method. If it doesn't, the server MUST close
685
security response data
687
A block of opaque data passed to the security
688
mechanism. The contents of this data are defined by
689
the SASL security mechanism. For the PLAIN security
690
mechanism this is defined as a field table holding two
691
fields, LOGIN and PASSWORD.
695
selected message locale
697
A single message local selected by the client, which
698
must be one of those specified by the server.
702
args.write_table(client_properties)
703
args.write_shortstr(mechanism)
704
args.write_longstr(response)
705
args.write_shortstr(locale)
706
self._send_method((10, 11), args)
709
def _tune(self, args):
711
propose connection tuning parameters
713
This method proposes a set of connection configuration values
714
to the client. The client can accept and/or adjust these.
719
proposed maximum channels
721
The maximum total number of channels that the server
722
allows per connection. Zero means that the server does
723
not impose a fixed limit, but the number of allowed
724
channels may be limited by available server resources.
728
proposed maximum frame size
730
The largest frame size that the server proposes for
731
the connection. The client can negotiate a lower
732
value. Zero means that the server does not impose any
733
specific limit but may reject very large frames if it
734
cannot allocate resources for them.
738
Until the frame-max has been negotiated, both
739
peers MUST accept frames of up to 4096 octets
740
large. The minimum non-zero value for the frame-
745
desired heartbeat delay
747
The delay, in seconds, of the connection heartbeat
748
that the server wants. Zero means the server does not
752
self.channel_max = args.read_short() or self.channel_max
753
self.frame_max = args.read_long() or self.frame_max
754
self.method_writer.frame_max = self.frame_max
755
self.heartbeat = args.read_short()
757
self._x_tune_ok(self.channel_max, self.frame_max, 0)
760
def _x_tune_ok(self, channel_max, frame_max, heartbeat):
762
negotiate connection tuning parameters
764
This method sends the client's connection tuning parameters to
765
the server. Certain fields are negotiated, others provide
766
capability information.
771
negotiated maximum channels
773
The maximum total number of channels that the client
774
will use per connection. May not be higher than the
775
value specified by the server.
779
The server MAY ignore the channel-max value or MAY
780
use it for tuning its resource allocation.
784
negotiated maximum frame size
786
The largest frame size that the client and server will
787
use for the connection. Zero means that the client
788
does not impose any specific limit but may reject very
789
large frames if it cannot allocate resources for them.
790
Note that the frame-max limit applies principally to
791
content frames, where large contents can be broken
792
into frames of arbitrary size.
796
Until the frame-max has been negotiated, both
797
peers must accept frames of up to 4096 octets
798
large. The minimum non-zero value for the frame-
803
desired heartbeat delay
805
The delay, in seconds, of the connection heartbeat
806
that the client wants. Zero means the client does not
811
args.write_short(channel_max)
812
args.write_long(frame_max)
813
args.write_short(heartbeat)
814
self._send_method((10, 31), args)
815
self._wait_tune_ok = False