5
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
7
# This library is free software; you can redistribute it and/or
8
# modify it under the terms of the GNU Lesser General Public
9
# License as published by the Free Software Foundation; either
10
# version 2.1 of the License, or (at your option) any later version.
12
# This library is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
# Lesser General Public License for more details.
17
# You should have received a copy of the GNU Lesser General Public
18
# License along with this library; if not, write to the Free Software
19
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
22
from Queue import Queue
24
from abstract_channel import AbstractChannel
25
from exceptions import *
26
from serialization import AMQPWriter
29
'Channel', # here mainly so it shows in in pydoc
32
AMQP_LOGGER = logging.getLogger('amqplib')
35
class Channel(AbstractChannel):
39
The channel class provides methods for a client to establish a
40
virtual connection - a channel - to a server and for both peers to
41
operate the virtual connection thereafter.
45
channel = open-channel *use-channel close-channel
46
open-channel = C:OPEN S:OPEN-OK
47
use-channel = C:FLOW S:FLOW-OK
51
close-channel = C:CLOSE S:CLOSE-OK
55
def __init__(self, connection, channel_id=None, auto_decode=True):
57
Create a channel bound to a connection and using the specified
58
numeric channel_id, and open on the server.
60
The 'auto_decode' parameter (defaults to True), indicates
61
whether the library should attempt to decode the body
62
of Messages to a Unicode string if there's a 'content_encoding'
63
property for the message. If there's no 'content_encoding'
64
property, or the decode raises an Exception, the plain string
65
is left as the message body.
68
if channel_id is None:
69
channel_id = connection._get_free_channel_id()
70
AMQP_LOGGER.debug('using channel_id: %d' % channel_id)
72
super(Channel, self).__init__(connection, channel_id)
74
self.default_ticket = 0
76
self.active = True # Flow control
78
self.returned_messages = Queue()
80
self.auto_decode = auto_decode
87
Tear down this object, after we've agreed to close with the server.
90
AMQP_LOGGER.debug('Closed channel #%d' % self.channel_id)
92
del self.connection.channels[self.channel_id]
93
self.channel_id = self.connection = None
99
def _alert(self, args):
101
This method allows the server to send a non-fatal warning to
102
the client. This is used for methods that are normally
103
asynchronous and thus do not have confirmations, and for which
104
the server may detect errors that need to be reported. Fatal
105
errors are handled as channel or connection exceptions; non-
106
fatal errors are sent through this method.
111
The reply code. The AMQ reply codes are defined in AMQ
116
The localised reply text. This text can be logged as an
117
aid to resolving issues.
121
detailed information for warning
123
A set of fields that provide more information about
124
the problem. The meaning of these fields are defined
125
on a per-reply-code basis (TO BE DEFINED).
128
reply_code = args.read_short()
129
reply_text = args.read_shortstr()
130
details = args.read_table()
132
self.alerts.put((reply_code, reply_text, details))
135
def close(self, reply_code=0, reply_text='', method_sig=(0, 0)):
137
request a channel close
139
This method indicates that the sender wants to close the
140
channel. This may be due to internal conditions (e.g. a forced
141
shut-down) or due to an error handling a specific method, i.e.
142
an exception. When a close is due to an exception, the sender
143
provides the class and method id of the method which caused
148
After sending this method any received method except
149
Channel.Close-OK MUST be discarded.
153
The peer sending this method MAY use a counter or timeout
154
to detect failure of the other peer to respond correctly
155
with Channel.Close-OK..
160
The reply code. The AMQ reply codes are defined in AMQ
165
The localised reply text. This text can be logged as an
166
aid to resolving issues.
172
When the close is provoked by a method exception, this
173
is the class of the method.
179
When the close is provoked by a method exception, this
180
is the ID of the method.
188
args.write_short(reply_code)
189
args.write_shortstr(reply_text)
190
args.write_short(method_sig[0]) # class_id
191
args.write_short(method_sig[1]) # method_id
192
self._send_method((20, 40), args)
193
return self.wait(allowed_methods=[
194
(20, 41), # Channel.close_ok
198
def _close(self, args):
200
request a channel close
202
This method indicates that the sender wants to close the
203
channel. This may be due to internal conditions (e.g. a forced
204
shut-down) or due to an error handling a specific method, i.e.
205
an exception. When a close is due to an exception, the sender
206
provides the class and method id of the method which caused
211
After sending this method any received method except
212
Channel.Close-OK MUST be discarded.
216
The peer sending this method MAY use a counter or timeout
217
to detect failure of the other peer to respond correctly
218
with Channel.Close-OK..
223
The reply code. The AMQ reply codes are defined in AMQ
228
The localised reply text. This text can be logged as an
229
aid to resolving issues.
235
When the close is provoked by a method exception, this
236
is the class of the method.
242
When the close is provoked by a method exception, this
243
is the ID of the method.
246
reply_code = args.read_short()
247
reply_text = args.read_shortstr()
248
class_id = args.read_short()
249
method_id = args.read_short()
254
# def close_ok(self):
256
# confirm a channel close
258
# This method confirms a Channel.Close method and tells the
259
# recipient that it is safe to release resources for the channel
260
# and close the socket.
264
# A peer that detects a socket closure without having
265
# received a Channel.Close-Ok handshake method SHOULD log
269
self._send_method((20, 41))
272
raise AMQPChannelException(reply_code, reply_text,
273
(class_id, method_id))
276
def _close_ok(self, args):
278
confirm a channel close
280
This method confirms a Channel.Close method and tells the
281
recipient that it is safe to release resources for the channel
282
and close the socket.
286
A peer that detects a socket closure without having
287
received a Channel.Close-Ok handshake method SHOULD log
294
def flow(self, active):
296
enable/disable flow from peer
298
This method asks the peer to pause or restart the flow of
299
content data. This is a simple flow-control mechanism that a
300
peer can use to avoid oveflowing its queues or otherwise
301
finding itself receiving more messages than it can process.
302
Note that this method is not intended for window control. The
303
peer that receives a request to stop sending content should
304
finish sending the current content, if any, and then wait
305
until it receives a Flow restart method.
309
When a new channel is opened, it is active. Some
310
applications assume that channels are inactive until
311
started. To emulate this behaviour a client MAY open the
312
channel, then pause it.
316
When sending content data in multiple frames, a peer
317
SHOULD monitor the channel for incoming methods and
318
respond to a Channel.Flow as rapidly as possible.
322
A peer MAY use the Channel.Flow method to throttle
323
incoming content data for internal reasons, for example,
324
when exchangeing data over a slower connection.
328
The peer that requests a Channel.Flow method MAY
329
disconnect and/or ban a peer that does not respect the
335
start/stop content frames
337
If True, the peer starts sending content frames. If
338
False, the peer stops sending content frames.
342
args.write_bit(active)
343
self._send_method((20, 20), args)
344
return self.wait(allowed_methods=[
345
(20, 21), # Channel.flow_ok
349
def _flow(self, args):
351
enable/disable flow from peer
353
This method asks the peer to pause or restart the flow of
354
content data. This is a simple flow-control mechanism that a
355
peer can use to avoid oveflowing its queues or otherwise
356
finding itself receiving more messages than it can process.
357
Note that this method is not intended for window control. The
358
peer that receives a request to stop sending content should
359
finish sending the current content, if any, and then wait
360
until it receives a Flow restart method.
364
When a new channel is opened, it is active. Some
365
applications assume that channels are inactive until
366
started. To emulate this behaviour a client MAY open the
367
channel, then pause it.
371
When sending content data in multiple frames, a peer
372
SHOULD monitor the channel for incoming methods and
373
respond to a Channel.Flow as rapidly as possible.
377
A peer MAY use the Channel.Flow method to throttle
378
incoming content data for internal reasons, for example,
379
when exchangeing data over a slower connection.
383
The peer that requests a Channel.Flow method MAY
384
disconnect and/or ban a peer that does not respect the
390
start/stop content frames
392
If True, the peer starts sending content frames. If
393
False, the peer stops sending content frames.
396
self.active = args.read_bit()
398
self._x_flow_ok(self.active)
401
def _x_flow_ok(self, active):
403
confirm a flow method
405
Confirms to the peer that a flow command was received and
413
Confirms the setting of the processed flow method:
414
True means the peer will start sending or continue
415
to send content frames; False means it will not.
419
args.write_bit(active)
420
self._send_method((20, 21), args)
423
def _flow_ok(self, args):
425
confirm a flow method
427
Confirms to the peer that a flow command was received and
435
Confirms the setting of the processed flow method:
436
True means the peer will start sending or continue
437
to send content frames; False means it will not.
440
return args.read_bit()
443
def _x_open(self, out_of_band=''):
445
open a channel for use
447
This method opens a virtual connection (a channel).
451
This method MUST NOT be called when the channel is already
455
out_of_band: shortstr
459
Configures out-of-band transfers on this channel. The
460
syntax and meaning of this field will be formally
461
defined at a later date.
468
args.write_shortstr(out_of_band)
469
self._send_method((20, 10), args)
470
return self.wait(allowed_methods=[
471
(20, 11), # Channel.open_ok
475
def _open_ok(self, args):
477
signal that the channel is ready
479
This method signals to the client that the channel is ready
484
AMQP_LOGGER.debug('Channel open')
492
# work with access tickets
494
# The protocol control access to server resources using access
495
# tickets. A client must explicitly request access tickets before
496
# doing work. An access ticket grants a client the right to use a
497
# specific set of resources - called a "realm" - in specific ways.
501
# access = C:REQUEST S:REQUEST-OK
505
def access_request(self, realm, exclusive=False,
506
passive=False, active=False, write=False, read=False):
508
request an access ticket
510
This method requests an access ticket for an access realm. The
511
server responds by granting the access ticket. If the client
512
does not have access rights to the requested realm this causes
513
a connection exception. Access tickets are a per-channel
518
The realm name MUST start with either "/data" (for
519
application resources) or "/admin" (for server
520
administration resources). If the realm starts with any
521
other path, the server MUST raise a connection exception
522
with reply code 403 (access refused).
526
The server MUST implement the /data realm and MAY
527
implement the /admin realm. The mapping of resources to
528
realms is not defined in the protocol - this is a server-
529
side configuration issue.
534
name of requested realm
538
If the specified realm is not known to the server,
539
the server must raise a channel exception with
540
reply code 402 (invalid path).
544
request exclusive access
546
Request exclusive access to the realm. If the server
547
cannot grant this - because there are other active
548
tickets for the realm - it raises a channel exception.
552
request passive access
554
Request message passive access to the specified access
555
realm. Passive access lets a client get information
556
about resources in the realm but not to make any
561
request active access
563
Request message active access to the specified access
564
realm. Acvtive access lets a client get create and
565
delete resources in the realm.
571
Request write access to the specified access realm.
572
Write access lets a client publish messages to all
573
exchanges in the realm.
579
Request read access to the specified access realm.
580
Read access lets a client consume messages from queues
583
The most recently requested ticket is used as the channel's
584
default ticket for any method that requires a ticket.
588
args.write_shortstr(realm)
589
args.write_bit(exclusive)
590
args.write_bit(passive)
591
args.write_bit(active)
592
args.write_bit(write)
594
self._send_method((30, 10), args)
595
return self.wait(allowed_methods=[
596
(30, 11), # Channel.access_request_ok
600
def _access_request_ok(self, args):
602
grant access to server resources
604
This method provides the client with an access ticket. The
605
access ticket is valid within the current channel and for the
606
lifespan of the channel.
610
The client MUST NOT use access tickets except within the
611
same channel as originally granted.
615
The server MUST isolate access tickets per channel and
616
treat an attempt by a client to mix these as a connection
623
self.default_ticket = args.read_short()
624
return self.default_ticket
632
# work with exchanges
634
# Exchanges match and distribute messages across queues.
635
# Exchanges can be configured in the server or created at runtime.
639
# exchange = C:DECLARE S:DECLARE-OK
640
# / C:DELETE S:DELETE-OK
644
# The server MUST implement the direct and fanout exchange
645
# types, and predeclare the corresponding exchanges named
646
# amq.direct and amq.fanout in each virtual host. The server
647
# MUST also predeclare a direct exchange to act as the default
648
# exchange for content Publish methods and for default queue
653
# The server SHOULD implement the topic exchange type, and
654
# predeclare the corresponding exchange named amq.topic in
659
# The server MAY implement the system exchange type, and
660
# predeclare the corresponding exchanges named amq.system in
661
# each virtual host. If the client attempts to bind a queue to
662
# the system exchange, the server MUST raise a connection
663
# exception with reply code 507 (not allowed).
667
# The default exchange MUST be defined as internal, and be
668
# inaccessible to the client except by specifying an empty
669
# exchange name in a content Publish method. That is, the
670
# server MUST NOT let clients make explicit bindings to this
675
def exchange_declare(self, exchange, type, passive=False, durable=False,
676
auto_delete=True, internal=False, nowait=False,
677
arguments=None, ticket=None):
679
declare exchange, create if needed
681
This method creates an exchange if it does not already exist,
682
and if the exchange exists, verifies that it is of the correct
687
The server SHOULD support a minimum of 16 exchanges per
688
virtual host and ideally, impose no limit except as
689
defined by available resources.
696
Exchange names starting with "amq." are reserved
697
for predeclared and standardised exchanges. If
698
the client attempts to create an exchange starting
699
with "amq.", the server MUST raise a channel
700
exception with reply code 403 (access refused).
706
Each exchange belongs to one of a set of exchange
707
types implemented by the server. The exchange types
708
define the functionality of the exchange - i.e. how
709
messages are routed through it. It is not valid or
710
meaningful to attempt to change the type of an
715
If the exchange already exists with a different
716
type, the server MUST raise a connection exception
717
with a reply code 507 (not allowed).
721
If the server does not support the requested
722
exchange type it MUST raise a connection exception
723
with a reply code 503 (command invalid).
727
do not create exchange
729
If set, the server will not create the exchange. The
730
client can use this to check whether an exchange
731
exists without modifying the server state.
735
If set, and the exchange does not already exist,
736
the server MUST raise a channel exception with
737
reply code 404 (not found).
741
request a durable exchange
743
If set when creating a new exchange, the exchange will
744
be marked as durable. Durable exchanges remain active
745
when a server restarts. Non-durable exchanges
746
(transient exchanges) are purged if/when a server
751
The server MUST support both durable and transient
756
The server MUST ignore the durable field if the
757
exchange already exists.
761
auto-delete when unused
763
If set, the exchange is deleted when all queues have
768
The server SHOULD allow for a reasonable delay
769
between the point when it determines that an
770
exchange is not being used (or no longer used),
771
and the point when it deletes the exchange. At
772
the least it must allow a client to create an
773
exchange and then bind a queue to it, with a small
774
but non-zero delay between these two actions.
778
The server MUST ignore the auto-delete field if
779
the exchange already exists.
783
create internal exchange
785
If set, the exchange may not be used directly by
786
publishers, but only when bound to other exchanges.
787
Internal exchanges are used to construct wiring that
788
is not visible to applications.
792
do not send a reply method
794
If set, the server will not respond to the method. The
795
client should not wait for a reply method. If the
796
server could not complete the method it will raise a
797
channel or connection exception.
801
arguments for declaration
803
A set of arguments for the declaration. The syntax and
804
semantics of these arguments depends on the server
805
implementation. This field is ignored if passive is
810
When a client defines a new exchange, this belongs to
811
the access realm of the ticket used. All further work
812
done with that exchange must be done with an access
813
ticket for the same realm.
817
The client MUST provide a valid access ticket
818
giving "active" access to the realm in which the
819
exchange exists or will be created, or "passive"
820
access if the if-exists flag is set.
823
if arguments is None:
827
if ticket is not None:
828
args.write_short(ticket)
830
args.write_short(self.default_ticket)
831
args.write_shortstr(exchange)
832
args.write_shortstr(type)
833
args.write_bit(passive)
834
args.write_bit(durable)
835
args.write_bit(auto_delete)
836
args.write_bit(internal)
837
args.write_bit(nowait)
838
args.write_table(arguments)
839
self._send_method((40, 10), args)
842
return self.wait(allowed_methods=[
843
(40, 11), # Channel.exchange_declare_ok
847
def _exchange_declare_ok(self, args):
849
confirms an exchange declaration
851
This method confirms a Declare method and confirms the name of
852
the exchange, essential for automatically-named exchanges.
858
def exchange_delete(self, exchange, if_unused=False,
859
nowait=False, ticket=None):
863
This method deletes an exchange. When an exchange is deleted
864
all queue bindings on the exchange are cancelled.
871
The exchange MUST exist. Attempting to delete a
872
non-existing exchange causes a channel exception.
876
delete only if unused
878
If set, the server will only delete the exchange if it
879
has no queue bindings. If the exchange has queue
880
bindings the server does not delete it but raises a
881
channel exception instead.
885
If set, the server SHOULD delete the exchange but
886
only if it has no queue bindings.
890
If set, the server SHOULD raise a channel
891
exception if the exchange is in use.
895
do not send a reply method
897
If set, the server will not respond to the method. The
898
client should not wait for a reply method. If the
899
server could not complete the method it will raise a
900
channel or connection exception.
906
The client MUST provide a valid access ticket
907
giving "active" access rights to the exchange's
912
if ticket is not None:
913
args.write_short(ticket)
915
args.write_short(self.default_ticket)
916
args.write_shortstr(exchange)
917
args.write_bit(if_unused)
918
args.write_bit(nowait)
919
self._send_method((40, 20), args)
922
return self.wait(allowed_methods=[
923
(40, 21), # Channel.exchange_delete_ok
927
def _exchange_delete_ok(self, args):
929
confirm deletion of an exchange
931
This method confirms the deletion of an exchange.
944
# Queues store and forward messages. Queues can be configured in
945
# the server or created at runtime. Queues must be attached to at
946
# least one exchange in order to receive messages from publishers.
950
# queue = C:DECLARE S:DECLARE-OK
952
# / C:PURGE S:PURGE-OK
953
# / C:DELETE S:DELETE-OK
957
# A server MUST allow any content class to be sent to any
958
# queue, in any mix, and queue and delivery these content
959
# classes independently. Note that all methods that fetch
960
# content off queues are specific to a given content class.
964
def queue_bind(self, queue, exchange, routing_key='',
965
nowait=False, arguments=None, ticket=None):
967
bind queue to an exchange
969
This method binds a queue to an exchange. Until a queue is
970
bound it will not receive any messages. In a classic
971
messaging model, store-and-forward queues are bound to a dest
972
exchange and subscription queues are bound to a dest_wild
977
A server MUST allow ignore duplicate bindings - that is,
978
two or more bind methods for a specific queue, with
979
identical arguments - without treating these as an error.
983
If a bind fails, the server MUST raise a connection
988
The server MUST NOT allow a durable queue to bind to a
989
transient exchange. If the client attempts this the server
990
MUST raise a channel exception.
994
Bindings for durable queues are automatically durable and
995
the server SHOULD restore such bindings after a server
1000
If the client attempts to an exchange that was declared as
1001
internal, the server MUST raise a connection exception
1002
with reply code 530 (not allowed).
1006
The server SHOULD support at least 4 bindings per queue,
1007
and ideally, impose no limit except as defined by
1008
available resources.
1013
Specifies the name of the queue to bind. If the queue
1014
name is empty, refers to the current queue for the
1015
channel, which is the last declared queue.
1019
If the client did not previously declare a queue,
1020
and the queue name in this method is empty, the
1021
server MUST raise a connection exception with
1022
reply code 530 (not allowed).
1026
If the queue does not exist the server MUST raise
1027
a channel exception with reply code 404 (not
1032
The name of the exchange to bind to.
1036
If the exchange does not exist the server MUST
1037
raise a channel exception with reply code 404 (not
1040
routing_key: shortstr
1044
Specifies the routing key for the binding. The
1045
routing key is used for routing messages depending on
1046
the exchange configuration. Not all exchanges use a
1047
routing key - refer to the specific exchange
1048
documentation. If the routing key is empty and the
1049
queue name is empty, the routing key will be the
1050
current queue for the channel, which is the last
1055
do not send a reply method
1057
If set, the server will not respond to the method. The
1058
client should not wait for a reply method. If the
1059
server could not complete the method it will raise a
1060
channel or connection exception.
1064
arguments for binding
1066
A set of arguments for the binding. The syntax and
1067
semantics of these arguments depends on the exchange
1072
The client provides a valid access ticket giving
1073
"active" access rights to the queue's access realm.
1076
if arguments is None:
1080
if ticket is not None:
1081
args.write_short(ticket)
1083
args.write_short(self.default_ticket)
1084
args.write_shortstr(queue)
1085
args.write_shortstr(exchange)
1086
args.write_shortstr(routing_key)
1087
args.write_bit(nowait)
1088
args.write_table(arguments)
1089
self._send_method((50, 20), args)
1092
return self.wait(allowed_methods=[
1093
(50, 21), # Channel.queue_bind_ok
1097
def _queue_bind_ok(self, args):
1099
confirm bind successful
1101
This method confirms that the bind was successful.
1107
def queue_declare(self, queue='', passive=False, durable=False,
1108
exclusive=False, auto_delete=True, nowait=False,
1109
arguments=None, ticket=None):
1111
declare queue, create if needed
1113
This method creates or checks a queue. When creating a new
1114
queue the client can specify various properties that control
1115
the durability of the queue and its contents, and the level of
1116
sharing for the queue.
1120
The server MUST create a default binding for a newly-
1121
created queue to the default exchange, which is an
1122
exchange of type 'direct'.
1126
The server SHOULD support a minimum of 256 queues per
1127
virtual host and ideally, impose no limit except as
1128
defined by available resources.
1135
The queue name MAY be empty, in which case the
1136
server MUST create a new queue with a unique
1137
generated name and return this to the client in
1138
the Declare-Ok method.
1142
Queue names starting with "amq." are reserved for
1143
predeclared and standardised server queues. If
1144
the queue name starts with "amq." and the passive
1145
option is False, the server MUST raise a connection
1146
exception with reply code 403 (access refused).
1152
If set, the server will not create the queue. The
1153
client can use this to check whether a queue exists
1154
without modifying the server state.
1158
If set, and the queue does not already exist, the
1159
server MUST respond with a reply code 404 (not
1160
found) and raise a channel exception.
1164
request a durable queue
1166
If set when creating a new queue, the queue will be
1167
marked as durable. Durable queues remain active when
1168
a server restarts. Non-durable queues (transient
1169
queues) are purged if/when a server restarts. Note
1170
that durable queues do not necessarily hold persistent
1171
messages, although it does not make sense to send
1172
persistent messages to a transient queue.
1176
The server MUST recreate the durable queue after a
1181
The server MUST support both durable and transient
1186
The server MUST ignore the durable field if the
1187
queue already exists.
1191
request an exclusive queue
1193
Exclusive queues may only be consumed from by the
1194
current connection. Setting the 'exclusive' flag
1195
always implies 'auto-delete'.
1199
The server MUST support both exclusive (private)
1200
and non-exclusive (shared) queues.
1204
The server MUST raise a channel exception if
1205
'exclusive' is specified and the queue already
1206
exists and is owned by a different connection.
1208
auto_delete: boolean
1210
auto-delete queue when unused
1212
If set, the queue is deleted when all consumers have
1213
finished using it. Last consumer can be cancelled
1214
either explicitly or because its channel is closed. If
1215
there was no consumer ever on the queue, it won't be
1220
The server SHOULD allow for a reasonable delay
1221
between the point when it determines that a queue
1222
is not being used (or no longer used), and the
1223
point when it deletes the queue. At the least it
1224
must allow a client to create a queue and then
1225
create a consumer to read from it, with a small
1226
but non-zero delay between these two actions. The
1227
server should equally allow for clients that may
1228
be disconnected prematurely, and wish to re-
1229
consume from the same queue without losing
1230
messages. We would recommend a configurable
1231
timeout, with a suitable default value being one
1236
The server MUST ignore the auto-delete field if
1237
the queue already exists.
1241
do not send a reply method
1243
If set, the server will not respond to the method. The
1244
client should not wait for a reply method. If the
1245
server could not complete the method it will raise a
1246
channel or connection exception.
1250
arguments for declaration
1252
A set of arguments for the declaration. The syntax and
1253
semantics of these arguments depends on the server
1254
implementation. This field is ignored if passive is
1259
When a client defines a new queue, this belongs to the
1260
access realm of the ticket used. All further work
1261
done with that queue must be done with an access
1262
ticket for the same realm.
1264
The client provides a valid access ticket giving
1265
"active" access to the realm in which the queue exists
1266
or will be created, or "passive" access if the if-
1269
Returns a tuple containing 3 items:
1270
the name of the queue (essential for automatically-named queues)
1275
if arguments is None:
1279
if ticket is not None:
1280
args.write_short(ticket)
1282
args.write_short(self.default_ticket)
1283
args.write_shortstr(queue)
1284
args.write_bit(passive)
1285
args.write_bit(durable)
1286
args.write_bit(exclusive)
1287
args.write_bit(auto_delete)
1288
args.write_bit(nowait)
1289
args.write_table(arguments)
1290
self._send_method((50, 10), args)
1293
return self.wait(allowed_methods=[
1294
(50, 11), # Channel.queue_declare_ok
1298
def _queue_declare_ok(self, args):
1300
confirms a queue definition
1302
This method confirms a Declare method and confirms the name of
1303
the queue, essential for automatically-named queues.
1308
Reports the name of the queue. If the server generated
1309
a queue name, this field contains that name.
1313
number of messages in queue
1315
Reports the number of messages in the queue, which
1316
will be zero for newly-created queues.
1318
consumer_count: long
1322
Reports the number of active consumers for the queue.
1323
Note that consumers can suspend activity
1324
(Channel.Flow) in which case they do not appear in
1328
queue = args.read_shortstr()
1329
message_count = args.read_long()
1330
consumer_count = args.read_long()
1332
return queue, message_count, consumer_count
1335
def queue_delete(self, queue='', if_unused=False, if_empty=False,
1336
nowait=False, ticket=None):
1340
This method deletes a queue. When a queue is deleted any
1341
pending messages are sent to a dead-letter queue if this is
1342
defined in the server configuration, and all consumers on the
1343
queue are cancelled.
1347
The server SHOULD use a dead-letter queue to hold messages
1348
that were pending on a deleted queue, and MAY provide
1349
facilities for a system administrator to move these
1350
messages back to an active queue.
1355
Specifies the name of the queue to delete. If the
1356
queue name is empty, refers to the current queue for
1357
the channel, which is the last declared queue.
1361
If the client did not previously declare a queue,
1362
and the queue name in this method is empty, the
1363
server MUST raise a connection exception with
1364
reply code 530 (not allowed).
1368
The queue must exist. Attempting to delete a non-
1369
existing queue causes a channel exception.
1373
delete only if unused
1375
If set, the server will only delete the queue if it
1376
has no consumers. If the queue has consumers the
1377
server does does not delete it but raises a channel
1382
The server MUST respect the if-unused flag when
1387
delete only if empty
1389
If set, the server will only delete the queue if it
1390
has no messages. If the queue is not empty the server
1391
raises a channel exception.
1395
do not send a reply method
1397
If set, the server will not respond to the method. The
1398
client should not wait for a reply method. If the
1399
server could not complete the method it will raise a
1400
channel or connection exception.
1404
The client provides a valid access ticket giving
1405
"active" access rights to the queue's access realm.
1409
if ticket is not None:
1410
args.write_short(ticket)
1412
args.write_short(self.default_ticket)
1414
args.write_shortstr(queue)
1415
args.write_bit(if_unused)
1416
args.write_bit(if_empty)
1417
args.write_bit(nowait)
1418
self._send_method((50, 40), args)
1421
return self.wait(allowed_methods=[
1422
(50, 41), # Channel.queue_delete_ok
1426
def _queue_delete_ok(self, args):
1428
confirm deletion of a queue
1430
This method confirms the deletion of a queue.
1435
number of messages purged
1437
Reports the number of messages purged.
1440
return args.read_long()
1443
def queue_purge(self, queue='', nowait=False, ticket=None):
1447
This method removes all messages from a queue. It does not
1448
cancel consumers. Purged messages are deleted without any
1449
formal "undo" mechanism.
1453
A call to purge MUST result in an empty queue.
1457
On transacted channels the server MUST not purge messages
1458
that have already been sent to a client but not yet
1463
The server MAY implement a purge queue or log that allows
1464
system administrators to recover accidentally-purged
1465
messages. The server SHOULD NOT keep purged messages in
1466
the same storage spaces as the live messages since the
1467
volumes of purged messages may get very large.
1472
Specifies the name of the queue to purge. If the
1473
queue name is empty, refers to the current queue for
1474
the channel, which is the last declared queue.
1478
If the client did not previously declare a queue,
1479
and the queue name in this method is empty, the
1480
server MUST raise a connection exception with
1481
reply code 530 (not allowed).
1485
The queue must exist. Attempting to purge a non-
1486
existing queue causes a channel exception.
1490
do not send a reply method
1492
If set, the server will not respond to the method. The
1493
client should not wait for a reply method. If the
1494
server could not complete the method it will raise a
1495
channel or connection exception.
1499
The access ticket must be for the access realm that
1504
The client MUST provide a valid access ticket
1505
giving "read" access rights to the queue's access
1506
realm. Note that purging a queue is equivalent to
1507
reading all messages and discarding them.
1509
if nowait is False, returns a message_count
1513
if ticket is not None:
1514
args.write_short(ticket)
1516
args.write_short(self.default_ticket)
1517
args.write_shortstr(queue)
1518
args.write_bit(nowait)
1519
self._send_method((50, 30), args)
1522
return self.wait(allowed_methods=[
1523
(50, 31), # Channel.queue_purge_ok
1527
def _queue_purge_ok(self, args):
1529
confirms a queue purge
1531
This method confirms the purge of a queue.
1536
number of messages purged
1538
Reports the number of messages purged.
1541
return args.read_long()
1549
# work with basic content
1551
# The Basic class provides methods that support an industry-
1552
# standard messaging model.
1556
# basic = C:QOS S:QOS-OK
1557
# / C:CONSUME S:CONSUME-OK
1558
# / C:CANCEL S:CANCEL-OK
1559
# / C:PUBLISH content
1560
# / S:RETURN content
1561
# / S:DELIVER content
1562
# / C:GET ( S:GET-OK content / S:GET-EMPTY )
1568
# The server SHOULD respect the persistent property of basic
1569
# messages and SHOULD make a best-effort to hold persistent
1570
# basic messages on a reliable storage mechanism.
1574
# The server MUST NOT discard a persistent basic message in
1575
# case of a queue overflow. The server MAY use the
1576
# Channel.Flow method to slow or stop a basic message
1577
# publisher when necessary.
1581
# The server MAY overflow non-persistent basic messages to
1582
# persistent storage and MAY discard or dead-letter non-
1583
# persistent basic messages on a priority basis if the queue
1584
# size exceeds some configured limit.
1588
# The server MUST implement at least 2 priority levels for
1589
# basic messages, where priorities 0-4 and 5-9 are treated as
1590
# two distinct levels. The server MAY implement up to 10
1595
# The server MUST deliver messages of the same priority in
1596
# order irrespective of their individual persistence.
1600
# The server MUST support both automatic and explicit
1601
# acknowledgements on Basic content.
1604
def basic_ack(self, delivery_tag, multiple=False):
1606
acknowledge one or more messages
1608
This method acknowledges one or more messages delivered via
1609
the Deliver or Get-Ok methods. The client can ask to confirm
1610
a single message or a set of messages up to and including a
1614
delivery_tag: longlong
1616
server-assigned delivery tag
1618
The server-assigned and channel-specific delivery tag
1622
The delivery tag is valid only within the channel
1623
from which the message was received. I.e. a client
1624
MUST NOT receive a message on one channel and then
1625
acknowledge it on another.
1629
The server MUST NOT use a zero value for delivery
1630
tags. Zero is reserved for client use, meaning "all
1631
messages so far received".
1635
acknowledge multiple messages
1637
If set to True, the delivery tag is treated as "up to
1638
and including", so that the client can acknowledge
1639
multiple messages with a single method. If set to
1640
False, the delivery tag refers to a single message.
1641
If the multiple field is True, and the delivery tag
1642
is zero, tells the server to acknowledge all
1643
outstanding mesages.
1647
The server MUST validate that a non-zero delivery-
1648
tag refers to an delivered message, and raise a
1649
channel exception if this is not the case.
1653
args.write_longlong(delivery_tag)
1654
args.write_bit(multiple)
1655
self._send_method((60, 80), args)
1658
def basic_cancel(self, consumer_tag, nowait=False):
1660
end a queue consumer
1662
This method cancels a consumer. This does not affect already
1663
delivered messages, but it does mean the server will not send
1664
any more messages for that consumer. The client may receive
1665
an abitrary number of messages in between sending the cancel
1666
method and receiving the cancel-ok reply.
1670
If the queue no longer exists when the client sends a
1671
cancel command, or the consumer has been cancelled for
1672
other reasons, this command has no effect.
1675
consumer_tag: shortstr
1679
Identifier for the consumer, valid within the current
1684
The consumer tag is valid only within the channel
1685
from which the consumer was created. I.e. a client
1686
MUST NOT create a consumer in one channel and then
1691
do not send a reply method
1693
If set, the server will not respond to the method. The
1694
client should not wait for a reply method. If the
1695
server could not complete the method it will raise a
1696
channel or connection exception.
1700
args.write_shortstr(consumer_tag)
1701
args.write_bit(nowait)
1702
self._send_method((60, 30), args)
1703
return self.wait(allowed_methods=[
1704
(60, 31), # Channel.basic_cancel_ok
1708
def _basic_cancel_ok(self, args):
1710
confirm a cancelled consumer
1712
This method confirms that the cancellation was completed.
1715
consumer_tag: shortstr
1719
Identifier for the consumer, valid within the current
1724
The consumer tag is valid only within the channel
1725
from which the consumer was created. I.e. a client
1726
MUST NOT create a consumer in one channel and then
1730
consumer_tag = args.read_shortstr()
1731
del self.callbacks[consumer_tag]
1734
def basic_consume(self, queue='', consumer_tag='', no_local=False,
1735
no_ack=False, exclusive=False, nowait=False,
1736
callback=None, ticket=None):
1738
start a queue consumer
1740
This method asks the server to start a "consumer", which is a
1741
transient request for messages from a specific queue.
1742
Consumers last as long as the channel they were created on, or
1743
until the client cancels them.
1747
The server SHOULD support at least 16 consumers per queue,
1748
unless the queue was declared as private, and ideally,
1749
impose no limit except as defined by available resources.
1754
Specifies the name of the queue to consume from. If
1755
the queue name is null, refers to the current queue
1756
for the channel, which is the last declared queue.
1760
If the client did not previously declare a queue,
1761
and the queue name in this method is empty, the
1762
server MUST raise a connection exception with
1763
reply code 530 (not allowed).
1765
consumer_tag: shortstr
1767
Specifies the identifier for the consumer. The
1768
consumer tag is local to a connection, so two clients
1769
can use the same consumer tags. If this field is empty
1770
the server will generate a unique tag.
1774
The tag MUST NOT refer to an existing consumer. If
1775
the client attempts to create two consumers with
1776
the same non-empty tag the server MUST raise a
1777
connection exception with reply code 530 (not
1782
do not deliver own messages
1784
If the no-local field is set the server will not send
1785
messages to the client that published them.
1789
no acknowledgement needed
1791
If this field is set the server does not expect
1792
acknowledgments for messages. That is, when a message
1793
is delivered to the client the server automatically and
1794
silently acknowledges it on behalf of the client. This
1795
functionality increases performance but at the cost of
1796
reliability. Messages can get lost if a client dies
1797
before it can deliver them to the application.
1801
request exclusive access
1803
Request exclusive consumer access, meaning only this
1804
consumer can access the queue.
1808
If the server cannot grant exclusive access to the
1809
queue when asked, - because there are other
1810
consumers active - it MUST raise a channel
1811
exception with return code 403 (access refused).
1815
do not send a reply method
1817
If set, the server will not respond to the method. The
1818
client should not wait for a reply method. If the
1819
server could not complete the method it will raise a
1820
channel or connection exception.
1822
callback: Python callable
1824
function/method called with each delivered message
1826
For each message delivered by the broker, the
1827
callable will be called with a Message object
1828
as the single argument. If no callable is specified,
1829
messages are quietly discarded, no_ack should probably
1830
be set to True in that case.
1836
The client MUST provide a valid access ticket
1837
giving "read" access rights to the realm for the
1842
if ticket is not None:
1843
args.write_short(ticket)
1845
args.write_short(self.default_ticket)
1846
args.write_shortstr(queue)
1847
args.write_shortstr(consumer_tag)
1848
args.write_bit(no_local)
1849
args.write_bit(no_ack)
1850
args.write_bit(exclusive)
1851
args.write_bit(nowait)
1852
self._send_method((60, 20), args)
1855
consumer_tag = self.wait(allowed_methods=[
1856
(60, 21), # Channel.basic_consume_ok
1859
self.callbacks[consumer_tag] = callback
1864
def _basic_consume_ok(self, args):
1866
confirm a new consumer
1868
The server provides the client with a consumer tag, which is
1869
used by the client for methods called on the consumer at a
1873
consumer_tag: shortstr
1875
Holds the consumer tag specified by the client or
1876
provided by the server.
1879
return args.read_shortstr()
1882
def _basic_deliver(self, args, msg):
1884
notify the client of a consumer message
1886
This method delivers a message to the client, via a consumer.
1887
In the asynchronous message delivery model, the client starts
1888
a consumer using the Consume method, then the server responds
1889
with Deliver methods as and when messages arrive for that
1894
The server SHOULD track the number of times a message has
1895
been delivered to clients and when a message is
1896
redelivered a certain number of times - e.g. 5 times -
1897
without being acknowledged, the server SHOULD consider the
1898
message to be unprocessable (possibly causing client
1899
applications to abort), and move the message to a dead
1903
consumer_tag: shortstr
1907
Identifier for the consumer, valid within the current
1912
The consumer tag is valid only within the channel
1913
from which the consumer was created. I.e. a client
1914
MUST NOT create a consumer in one channel and then
1917
delivery_tag: longlong
1919
server-assigned delivery tag
1921
The server-assigned and channel-specific delivery tag
1925
The delivery tag is valid only within the channel
1926
from which the message was received. I.e. a client
1927
MUST NOT receive a message on one channel and then
1928
acknowledge it on another.
1932
The server MUST NOT use a zero value for delivery
1933
tags. Zero is reserved for client use, meaning "all
1934
messages so far received".
1936
redelivered: boolean
1938
message is being redelivered
1940
This indicates that the message has been previously
1941
delivered to this or another client.
1945
Specifies the name of the exchange that the message
1946
was originally published to.
1948
routing_key: shortstr
1952
Specifies the routing key name specified when the
1953
message was published.
1956
consumer_tag = args.read_shortstr()
1957
delivery_tag = args.read_longlong()
1958
redelivered = args.read_bit()
1959
exchange = args.read_shortstr()
1960
routing_key = args.read_shortstr()
1962
msg.delivery_info = {
1964
'consumer_tag': consumer_tag,
1965
'delivery_tag': delivery_tag,
1966
'redelivered': redelivered,
1967
'exchange': exchange,
1968
'routing_key': routing_key,
1971
func = self.callbacks.get(consumer_tag, None)
1972
if func is not None:
1976
def basic_get(self, queue='', no_ack=False, ticket=None):
1978
direct access to a queue
1980
This method provides a direct access to the messages in a
1981
queue using a synchronous dialogue that is designed for
1982
specific types of application where synchronous functionality
1983
is more important than performance.
1988
Specifies the name of the queue to consume from. If
1989
the queue name is null, refers to the current queue
1990
for the channel, which is the last declared queue.
1994
If the client did not previously declare a queue,
1995
and the queue name in this method is empty, the
1996
server MUST raise a connection exception with
1997
reply code 530 (not allowed).
2001
no acknowledgement needed
2003
If this field is set the server does not expect
2004
acknowledgments for messages. That is, when a message
2005
is delivered to the client the server automatically and
2006
silently acknowledges it on behalf of the client. This
2007
functionality increases performance but at the cost of
2008
reliability. Messages can get lost if a client dies
2009
before it can deliver them to the application.
2015
The client MUST provide a valid access ticket
2016
giving "read" access rights to the realm for the
2019
Non-blocking, returns a message object, or None.
2023
if ticket is not None:
2024
args.write_short(ticket)
2026
args.write_short(self.default_ticket)
2027
args.write_shortstr(queue)
2028
args.write_bit(no_ack)
2029
self._send_method((60, 70), args)
2030
return self.wait(allowed_methods=[
2031
(60, 71), # Channel.basic_get_ok
2032
(60, 72), # Channel.basic_get_empty
2036
def _basic_get_empty(self, args):
2038
indicate no messages available
2040
This method tells the client that the queue has no messages
2041
available for the client.
2044
cluster_id: shortstr
2048
For use by cluster applications, should not be used by
2049
client applications.
2052
cluster_id = args.read_shortstr()
2055
def _basic_get_ok(self, args, msg):
2057
provide client with a message
2059
This method delivers a message to the client following a get
2060
method. A message delivered by 'get-ok' must be acknowledged
2061
unless the no-ack option was set in the get method.
2064
delivery_tag: longlong
2066
server-assigned delivery tag
2068
The server-assigned and channel-specific delivery tag
2072
The delivery tag is valid only within the channel
2073
from which the message was received. I.e. a client
2074
MUST NOT receive a message on one channel and then
2075
acknowledge it on another.
2079
The server MUST NOT use a zero value for delivery
2080
tags. Zero is reserved for client use, meaning "all
2081
messages so far received".
2083
redelivered: boolean
2085
message is being redelivered
2087
This indicates that the message has been previously
2088
delivered to this or another client.
2092
Specifies the name of the exchange that the message
2093
was originally published to. If empty, the message
2094
was published to the default exchange.
2096
routing_key: shortstr
2100
Specifies the routing key name specified when the
2101
message was published.
2105
number of messages pending
2107
This field reports the number of messages pending on
2108
the queue, excluding the message being delivered.
2109
Note that this figure is indicative, not reliable, and
2110
can change arbitrarily as messages are added to the
2111
queue and removed by other clients.
2114
delivery_tag = args.read_longlong()
2115
redelivered = args.read_bit()
2116
exchange = args.read_shortstr()
2117
routing_key = args.read_shortstr()
2118
message_count = args.read_long()
2120
msg.delivery_info = {
2121
'delivery_tag': delivery_tag,
2122
'redelivered': redelivered,
2123
'exchange': exchange,
2124
'routing_key': routing_key,
2125
'message_count': message_count
2131
def basic_publish(self, msg, exchange='', routing_key='',
2132
mandatory=False, immediate=False, ticket=None):
2136
This method publishes a message to a specific exchange. The
2137
message will be routed to queues as defined by the exchange
2138
configuration and distributed to any active consumers when the
2139
transaction, if any, is committed.
2144
Specifies the name of the exchange to publish to. The
2145
exchange name can be empty, meaning the default
2146
exchange. If the exchange name is specified, and that
2147
exchange does not exist, the server will raise a
2152
The server MUST accept a blank exchange name to
2153
mean the default exchange.
2157
If the exchange was declared as an internal
2158
exchange, the server MUST raise a channel
2159
exception with a reply code 403 (access refused).
2163
The exchange MAY refuse basic content in which
2164
case it MUST raise a channel exception with reply
2165
code 540 (not implemented).
2167
routing_key: shortstr
2171
Specifies the routing key for the message. The
2172
routing key is used for routing messages depending on
2173
the exchange configuration.
2177
indicate mandatory routing
2179
This flag tells the server how to react if the message
2180
cannot be routed to a queue. If this flag is True, the
2181
server will return an unroutable message with a Return
2182
method. If this flag is False, the server silently
2187
The server SHOULD implement the mandatory flag.
2191
request immediate delivery
2193
This flag tells the server how to react if the message
2194
cannot be routed to a queue consumer immediately. If
2195
this flag is set, the server will return an
2196
undeliverable message with a Return method. If this
2197
flag is zero, the server will queue the message, but
2198
with no guarantee that it will ever be consumed.
2202
The server SHOULD implement the immediate flag.
2208
The client MUST provide a valid access ticket
2209
giving "write" access rights to the access realm
2214
if ticket is not None:
2215
args.write_short(ticket)
2217
args.write_short(self.default_ticket)
2218
args.write_shortstr(exchange)
2219
args.write_shortstr(routing_key)
2220
args.write_bit(mandatory)
2221
args.write_bit(immediate)
2223
self._send_method((60, 40), args, msg)
2226
def basic_qos(self, prefetch_size, prefetch_count, a_global):
2228
specify quality of service
2230
This method requests a specific quality of service. The QoS
2231
can be specified for the current channel or for all channels
2232
on the connection. The particular properties and semantics of
2233
a qos method always depend on the content class semantics.
2234
Though the qos method could in principle apply to both peers,
2235
it is currently meaningful only for the server.
2240
prefetch window in octets
2242
The client can request that messages be sent in
2243
advance so that when the client finishes processing a
2244
message, the following message is already held
2245
locally, rather than needing to be sent down the
2246
channel. Prefetching gives a performance improvement.
2247
This field specifies the prefetch window size in
2248
octets. The server will send a message in advance if
2249
it is equal to or smaller in size than the available
2250
prefetch size (and also falls into other prefetch
2251
limits). May be set to zero, meaning "no specific
2252
limit", although other prefetch limits may still
2253
apply. The prefetch-size is ignored if the no-ack
2258
The server MUST ignore this setting when the
2259
client is not processing any messages - i.e. the
2260
prefetch size does not limit the transfer of
2261
single messages to a client, only the sending in
2262
advance of more messages while the client still
2263
has one or more unacknowledged messages.
2265
prefetch_count: short
2267
prefetch window in messages
2269
Specifies a prefetch window in terms of whole
2270
messages. This field may be used in combination with
2271
the prefetch-size field; a message will only be sent
2272
in advance if both prefetch windows (and those at the
2273
channel and connection level) allow it. The prefetch-
2274
count is ignored if the no-ack option is set.
2278
The server MAY send less data in advance than
2279
allowed by the client's specified prefetch windows
2280
but it MUST NOT send more.
2284
apply to entire connection
2286
By default the QoS settings apply to the current
2287
channel only. If this field is set, they are applied
2288
to the entire connection.
2292
args.write_long(prefetch_size)
2293
args.write_short(prefetch_count)
2294
args.write_bit(a_global)
2295
self._send_method((60, 10), args)
2296
return self.wait(allowed_methods=[
2297
(60, 11), # Channel.basic_qos_ok
2301
def _basic_qos_ok(self, args):
2303
confirm the requested qos
2305
This method tells the client that the requested QoS levels
2306
could be handled by the server. The requested QoS applies to
2307
all active consumers until a new QoS is defined.
2313
def basic_recover(self, requeue=False):
2315
redeliver unacknowledged messages
2317
This method asks the broker to redeliver all unacknowledged
2318
messages on a specified channel. Zero or more messages may be
2319
redelivered. This method is only allowed on non-transacted
2324
The server MUST set the redelivered flag on all messages
2329
The server MUST raise a channel exception if this is
2330
called on a transacted channel.
2337
If this field is False, the message will be redelivered
2338
to the original recipient. If this field is True, the
2339
server will attempt to requeue the message,
2340
potentially then delivering it to an alternative
2345
args.write_bit(requeue)
2346
self._send_method((60, 100), args)
2349
def basic_reject(self, delivery_tag, requeue):
2351
reject an incoming message
2353
This method allows a client to reject a message. It can be
2354
used to interrupt and cancel large incoming messages, or
2355
return untreatable messages to their original queue.
2359
The server SHOULD be capable of accepting and process the
2360
Reject method while sending message content with a Deliver
2361
or Get-Ok method. I.e. the server should read and process
2362
incoming methods while sending output frames. To cancel a
2363
partially-send content, the server sends a content body
2364
frame of size 1 (i.e. with no data except the frame-end
2369
The server SHOULD interpret this method as meaning that
2370
the client is unable to process the message at this time.
2374
A client MUST NOT use this method as a means of selecting
2375
messages to process. A rejected message MAY be discarded
2376
or dead-lettered, not necessarily passed to another
2380
delivery_tag: longlong
2382
server-assigned delivery tag
2384
The server-assigned and channel-specific delivery tag
2388
The delivery tag is valid only within the channel
2389
from which the message was received. I.e. a client
2390
MUST NOT receive a message on one channel and then
2391
acknowledge it on another.
2395
The server MUST NOT use a zero value for delivery
2396
tags. Zero is reserved for client use, meaning "all
2397
messages so far received".
2403
If this field is False, the message will be discarded.
2404
If this field is True, the server will attempt to
2405
requeue the message.
2409
The server MUST NOT deliver the message to the
2410
same client within the context of the current
2411
channel. The recommended strategy is to attempt
2412
to deliver the message to an alternative consumer,
2413
and if that is not possible, to move the message
2414
to a dead-letter queue. The server MAY use more
2415
sophisticated tracking to hold the message on the
2416
queue and redeliver it to the same client at a
2421
args.write_longlong(delivery_tag)
2422
args.write_bit(requeue)
2423
self._send_method((60, 90), args)
2426
def _basic_return(self, args, msg):
2428
return a failed message
2430
This method returns an undeliverable message that was
2431
published with the "immediate" flag set, or an unroutable
2432
message published with the "mandatory" flag set. The reply
2433
code and text provide information about the reason that the
2434
message was undeliverable.
2439
The reply code. The AMQ reply codes are defined in AMQ
2442
reply_text: shortstr
2444
The localised reply text. This text can be logged as an
2445
aid to resolving issues.
2449
Specifies the name of the exchange that the message
2450
was originally published to.
2452
routing_key: shortstr
2456
Specifies the routing key name specified when the
2457
message was published.
2460
reply_code = args.read_short()
2461
reply_text = args.read_shortstr()
2462
exchange = args.read_shortstr()
2463
routing_key = args.read_shortstr()
2465
self.returned_messages.put(
2466
(reply_code, reply_text, exchange, routing_key, msg)
2475
# work with standard transactions
2477
# Standard transactions provide so-called "1.5 phase commit". We
2478
# can ensure that work is never lost, but there is a chance of
2479
# confirmations being lost, so that messages may be resent.
2480
# Applications that use standard transactions must be able to
2481
# detect and ignore duplicate messages.
2485
# tx = C:SELECT S:SELECT-OK
2486
# / C:COMMIT S:COMMIT-OK
2487
# / C:ROLLBACK S:ROLLBACK-OK
2491
# An client using standard transactions SHOULD be able to
2492
# track all messages received within a reasonable period, and
2493
# thus detect and reject duplicates of the same message. It
2494
# SHOULD NOT pass these to the application layer.
2498
def tx_commit(self):
2500
commit the current transaction
2502
This method commits all messages published and acknowledged in
2503
the current transaction. A new transaction starts immediately
2507
self._send_method((90, 20))
2508
return self.wait(allowed_methods=[
2509
(90, 21), # Channel.tx_commit_ok
2513
def _tx_commit_ok(self, args):
2515
confirm a successful commit
2517
This method confirms to the client that the commit succeeded.
2518
Note that if a commit fails, the server raises a channel
2525
def tx_rollback(self):
2527
abandon the current transaction
2529
This method abandons all messages published and acknowledged
2530
in the current transaction. A new transaction starts
2531
immediately after a rollback.
2534
self._send_method((90, 30))
2535
return self.wait(allowed_methods=[
2536
(90, 31), # Channel.tx_rollback_ok
2540
def _tx_rollback_ok(self, args):
2542
confirm a successful rollback
2544
This method confirms to the client that the rollback
2545
succeeded. Note that if an rollback fails, the server raises a
2552
def tx_select(self):
2554
select standard transaction mode
2556
This method sets the channel to use standard transactions.
2557
The client must use this method at least once on a channel
2558
before using the Commit or Rollback methods.
2561
self._send_method((90, 10))
2562
return self.wait(allowed_methods=[
2563
(90, 11), # Channel.tx_select_ok
2567
def _tx_select_ok(self, args):
2569
confirm transaction mode
2571
This method confirms to the client that the channel was
2572
successfully set to use standard transactions.
2584
(20, 41): _close_ok,
2585
(30, 11): _access_request_ok,
2586
(40, 11): _exchange_declare_ok,
2587
(40, 21): _exchange_delete_ok,
2588
(50, 11): _queue_declare_ok,
2589
(50, 21): _queue_bind_ok,
2590
(50, 31): _queue_purge_ok,
2591
(50, 41): _queue_delete_ok,
2592
(60, 11): _basic_qos_ok,
2593
(60, 21): _basic_consume_ok,
2594
(60, 31): _basic_cancel_ok,
2595
(60, 50): _basic_return,
2596
(60, 60): _basic_deliver,
2597
(60, 71): _basic_get_ok,
2598
(60, 72): _basic_get_empty,
2599
(90, 11): _tx_select_ok,
2600
(90, 21): _tx_commit_ok,
2601
(90, 31): _tx_rollback_ok,