~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/amqplib/client_0_8/channel.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
AMQP 0-8 Channels
 
3
 
 
4
"""
 
5
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
 
6
#
 
7
# This library is free software; you can redistribute it and/or
 
8
# modify it under the terms of the GNU Lesser General Public
 
9
# License as published by the Free Software Foundation; either
 
10
# version 2.1 of the License, or (at your option) any later version.
 
11
#
 
12
# This library is distributed in the hope that it will be useful,
 
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 
15
# Lesser General Public License for more details.
 
16
#
 
17
# You should have received a copy of the GNU Lesser General Public
 
18
# License along with this library; if not, write to the Free Software
 
19
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301
 
20
 
 
21
import logging
 
22
from Queue import Queue
 
23
 
 
24
from abstract_channel import AbstractChannel
 
25
from exceptions import *
 
26
from serialization import AMQPWriter
 
27
 
 
28
__all__ =  [
 
29
            'Channel',      # here mainly so it shows in in pydoc
 
30
           ]
 
31
 
 
32
AMQP_LOGGER = logging.getLogger('amqplib')
 
33
 
 
34
 
 
35
class Channel(AbstractChannel):
 
36
    """
 
37
    work with channels
 
38
 
 
39
    The channel class provides methods for a client to establish a
 
40
    virtual connection - a channel - to a server and for both peers to
 
41
    operate the virtual connection thereafter.
 
42
 
 
43
    GRAMMAR:
 
44
 
 
45
        channel             = open-channel *use-channel close-channel
 
46
        open-channel        = C:OPEN S:OPEN-OK
 
47
        use-channel         = C:FLOW S:FLOW-OK
 
48
                            / S:FLOW C:FLOW-OK
 
49
                            / S:ALERT
 
50
                            / functional-class
 
51
        close-channel       = C:CLOSE S:CLOSE-OK
 
52
                            / S:CLOSE C:CLOSE-OK
 
53
 
 
54
    """
 
55
    def __init__(self, connection, channel_id=None, auto_decode=True):
 
56
        """
 
57
        Create a channel bound to a connection and using the specified
 
58
        numeric channel_id, and open on the server.
 
59
 
 
60
        The 'auto_decode' parameter (defaults to True), indicates
 
61
        whether the library should attempt to decode the body
 
62
        of Messages to a Unicode string if there's a 'content_encoding'
 
63
        property for the message.  If there's no 'content_encoding'
 
64
        property, or the decode raises an Exception, the plain string
 
65
        is left as the message body.
 
66
 
 
67
        """
 
68
        if channel_id is None:
 
69
            channel_id = connection._get_free_channel_id()
 
70
        AMQP_LOGGER.debug('using channel_id: %d' % channel_id)
 
71
 
 
72
        super(Channel, self).__init__(connection, channel_id)
 
73
 
 
74
        self.default_ticket = 0
 
75
        self.is_open = False
 
76
        self.active = True # Flow control
 
77
        self.alerts = Queue()
 
78
        self.returned_messages = Queue()
 
79
        self.callbacks = {}
 
80
        self.auto_decode = auto_decode
 
81
 
 
82
        self._x_open()
 
83
 
 
84
 
 
85
    def _do_close(self):
 
86
        """
 
87
        Tear down this object, after we've agreed to close with the server.
 
88
 
 
89
        """
 
90
        AMQP_LOGGER.debug('Closed channel #%d' % self.channel_id)
 
91
        self.is_open = False
 
92
        del self.connection.channels[self.channel_id]
 
93
        self.channel_id = self.connection = None
 
94
        self.callbacks = {}
 
95
 
 
96
 
 
97
    #################
 
98
 
 
99
    def _alert(self, args):
 
100
        """
 
101
        This method allows the server to send a non-fatal warning to
 
102
        the client.  This is used for methods that are normally
 
103
        asynchronous and thus do not have confirmations, and for which
 
104
        the server may detect errors that need to be reported.  Fatal
 
105
        errors are handled as channel or connection exceptions; non-
 
106
        fatal errors are sent through this method.
 
107
 
 
108
        PARAMETERS:
 
109
            reply_code: short
 
110
 
 
111
                The reply code. The AMQ reply codes are defined in AMQ
 
112
                RFC 011.
 
113
 
 
114
            reply_text: shortstr
 
115
 
 
116
                The localised reply text.  This text can be logged as an
 
117
                aid to resolving issues.
 
118
 
 
119
            details: table
 
120
 
 
121
                detailed information for warning
 
122
 
 
123
                A set of fields that provide more information about
 
124
                the problem.  The meaning of these fields are defined
 
125
                on a per-reply-code basis (TO BE DEFINED).
 
126
 
 
127
        """
 
128
        reply_code = args.read_short()
 
129
        reply_text = args.read_shortstr()
 
130
        details = args.read_table()
 
131
 
 
132
        self.alerts.put((reply_code, reply_text, details))
 
133
 
 
134
 
 
135
    def close(self, reply_code=0, reply_text='', method_sig=(0, 0)):
 
136
        """
 
137
        request a channel close
 
138
 
 
139
        This method indicates that the sender wants to close the
 
140
        channel. This may be due to internal conditions (e.g. a forced
 
141
        shut-down) or due to an error handling a specific method, i.e.
 
142
        an exception.  When a close is due to an exception, the sender
 
143
        provides the class and method id of the method which caused
 
144
        the exception.
 
145
 
 
146
        RULE:
 
147
 
 
148
            After sending this method any received method except
 
149
            Channel.Close-OK MUST be discarded.
 
150
 
 
151
        RULE:
 
152
 
 
153
            The peer sending this method MAY use a counter or timeout
 
154
            to detect failure of the other peer to respond correctly
 
155
            with Channel.Close-OK..
 
156
 
 
157
        PARAMETERS:
 
158
            reply_code: short
 
159
 
 
160
                The reply code. The AMQ reply codes are defined in AMQ
 
161
                RFC 011.
 
162
 
 
163
            reply_text: shortstr
 
164
 
 
165
                The localised reply text.  This text can be logged as an
 
166
                aid to resolving issues.
 
167
 
 
168
            class_id: short
 
169
 
 
170
                failing method class
 
171
 
 
172
                When the close is provoked by a method exception, this
 
173
                is the class of the method.
 
174
 
 
175
            method_id: short
 
176
 
 
177
                failing method ID
 
178
 
 
179
                When the close is provoked by a method exception, this
 
180
                is the ID of the method.
 
181
 
 
182
        """
 
183
        if not self.is_open:
 
184
            # already closed
 
185
            return
 
186
 
 
187
        args = AMQPWriter()
 
188
        args.write_short(reply_code)
 
189
        args.write_shortstr(reply_text)
 
190
        args.write_short(method_sig[0]) # class_id
 
191
        args.write_short(method_sig[1]) # method_id
 
192
        self._send_method((20, 40), args)
 
193
        return self.wait(allowed_methods=[
 
194
                          (20, 41),    # Channel.close_ok
 
195
                        ])
 
196
 
 
197
 
 
198
    def _close(self, args):
 
199
        """
 
200
        request a channel close
 
201
 
 
202
        This method indicates that the sender wants to close the
 
203
        channel. This may be due to internal conditions (e.g. a forced
 
204
        shut-down) or due to an error handling a specific method, i.e.
 
205
        an exception.  When a close is due to an exception, the sender
 
206
        provides the class and method id of the method which caused
 
207
        the exception.
 
208
 
 
209
        RULE:
 
210
 
 
211
            After sending this method any received method except
 
212
            Channel.Close-OK MUST be discarded.
 
213
 
 
214
        RULE:
 
215
 
 
216
            The peer sending this method MAY use a counter or timeout
 
217
            to detect failure of the other peer to respond correctly
 
218
            with Channel.Close-OK..
 
219
 
 
220
        PARAMETERS:
 
221
            reply_code: short
 
222
 
 
223
                The reply code. The AMQ reply codes are defined in AMQ
 
224
                RFC 011.
 
225
 
 
226
            reply_text: shortstr
 
227
 
 
228
                The localised reply text.  This text can be logged as an
 
229
                aid to resolving issues.
 
230
 
 
231
            class_id: short
 
232
 
 
233
                failing method class
 
234
 
 
235
                When the close is provoked by a method exception, this
 
236
                is the class of the method.
 
237
 
 
238
            method_id: short
 
239
 
 
240
                failing method ID
 
241
 
 
242
                When the close is provoked by a method exception, this
 
243
                is the ID of the method.
 
244
 
 
245
        """
 
246
        reply_code = args.read_short()
 
247
        reply_text = args.read_shortstr()
 
248
        class_id = args.read_short()
 
249
        method_id = args.read_short()
 
250
 
 
251
#        self.close_ok()
 
252
 
 
253
 
 
254
#    def close_ok(self):
 
255
#        """
 
256
#        confirm a channel close
 
257
#
 
258
#        This method confirms a Channel.Close method and tells the
 
259
#        recipient that it is safe to release resources for the channel
 
260
#        and close the socket.
 
261
#
 
262
#        RULE:
 
263
#
 
264
#            A peer that detects a socket closure without having
 
265
#            received a Channel.Close-Ok handshake method SHOULD log
 
266
#            the error.
 
267
#
 
268
#        """
 
269
        self._send_method((20, 41))
 
270
        self._do_close()
 
271
 
 
272
        raise AMQPChannelException(reply_code, reply_text,
 
273
            (class_id, method_id))
 
274
 
 
275
 
 
276
    def _close_ok(self, args):
 
277
        """
 
278
        confirm a channel close
 
279
 
 
280
        This method confirms a Channel.Close method and tells the
 
281
        recipient that it is safe to release resources for the channel
 
282
        and close the socket.
 
283
 
 
284
        RULE:
 
285
 
 
286
            A peer that detects a socket closure without having
 
287
            received a Channel.Close-Ok handshake method SHOULD log
 
288
            the error.
 
289
 
 
290
        """
 
291
        self._do_close()
 
292
 
 
293
 
 
294
    def flow(self, active):
 
295
        """
 
296
        enable/disable flow from peer
 
297
 
 
298
        This method asks the peer to pause or restart the flow of
 
299
        content data. This is a simple flow-control mechanism that a
 
300
        peer can use to avoid oveflowing its queues or otherwise
 
301
        finding itself receiving more messages than it can process.
 
302
        Note that this method is not intended for window control.  The
 
303
        peer that receives a request to stop sending content should
 
304
        finish sending the current content, if any, and then wait
 
305
        until it receives a Flow restart method.
 
306
 
 
307
        RULE:
 
308
 
 
309
            When a new channel is opened, it is active.  Some
 
310
            applications assume that channels are inactive until
 
311
            started.  To emulate this behaviour a client MAY open the
 
312
            channel, then pause it.
 
313
 
 
314
        RULE:
 
315
 
 
316
            When sending content data in multiple frames, a peer
 
317
            SHOULD monitor the channel for incoming methods and
 
318
            respond to a Channel.Flow as rapidly as possible.
 
319
 
 
320
        RULE:
 
321
 
 
322
            A peer MAY use the Channel.Flow method to throttle
 
323
            incoming content data for internal reasons, for example,
 
324
            when exchangeing data over a slower connection.
 
325
 
 
326
        RULE:
 
327
 
 
328
            The peer that requests a Channel.Flow method MAY
 
329
            disconnect and/or ban a peer that does not respect the
 
330
            request.
 
331
 
 
332
        PARAMETERS:
 
333
            active: boolean
 
334
 
 
335
                start/stop content frames
 
336
 
 
337
                If True, the peer starts sending content frames.  If
 
338
                False, the peer stops sending content frames.
 
339
 
 
340
        """
 
341
        args = AMQPWriter()
 
342
        args.write_bit(active)
 
343
        self._send_method((20, 20), args)
 
344
        return self.wait(allowed_methods=[
 
345
                          (20, 21),    # Channel.flow_ok
 
346
                        ])
 
347
 
 
348
 
 
349
    def _flow(self, args):
 
350
        """
 
351
        enable/disable flow from peer
 
352
 
 
353
        This method asks the peer to pause or restart the flow of
 
354
        content data. This is a simple flow-control mechanism that a
 
355
        peer can use to avoid oveflowing its queues or otherwise
 
356
        finding itself receiving more messages than it can process.
 
357
        Note that this method is not intended for window control.  The
 
358
        peer that receives a request to stop sending content should
 
359
        finish sending the current content, if any, and then wait
 
360
        until it receives a Flow restart method.
 
361
 
 
362
        RULE:
 
363
 
 
364
            When a new channel is opened, it is active.  Some
 
365
            applications assume that channels are inactive until
 
366
            started.  To emulate this behaviour a client MAY open the
 
367
            channel, then pause it.
 
368
 
 
369
        RULE:
 
370
 
 
371
            When sending content data in multiple frames, a peer
 
372
            SHOULD monitor the channel for incoming methods and
 
373
            respond to a Channel.Flow as rapidly as possible.
 
374
 
 
375
        RULE:
 
376
 
 
377
            A peer MAY use the Channel.Flow method to throttle
 
378
            incoming content data for internal reasons, for example,
 
379
            when exchangeing data over a slower connection.
 
380
 
 
381
        RULE:
 
382
 
 
383
            The peer that requests a Channel.Flow method MAY
 
384
            disconnect and/or ban a peer that does not respect the
 
385
            request.
 
386
 
 
387
        PARAMETERS:
 
388
            active: boolean
 
389
 
 
390
                start/stop content frames
 
391
 
 
392
                If True, the peer starts sending content frames.  If
 
393
                False, the peer stops sending content frames.
 
394
 
 
395
        """
 
396
        self.active = args.read_bit()
 
397
 
 
398
        self._x_flow_ok(self.active)
 
399
 
 
400
 
 
401
    def _x_flow_ok(self, active):
 
402
        """
 
403
        confirm a flow method
 
404
 
 
405
        Confirms to the peer that a flow command was received and
 
406
        processed.
 
407
 
 
408
        PARAMETERS:
 
409
            active: boolean
 
410
 
 
411
                current flow setting
 
412
 
 
413
                Confirms the setting of the processed flow method:
 
414
                True means the peer will start sending or continue
 
415
                to send content frames; False means it will not.
 
416
 
 
417
        """
 
418
        args = AMQPWriter()
 
419
        args.write_bit(active)
 
420
        self._send_method((20, 21), args)
 
421
 
 
422
 
 
423
    def _flow_ok(self, args):
 
424
        """
 
425
        confirm a flow method
 
426
 
 
427
        Confirms to the peer that a flow command was received and
 
428
        processed.
 
429
 
 
430
        PARAMETERS:
 
431
            active: boolean
 
432
 
 
433
                current flow setting
 
434
 
 
435
                Confirms the setting of the processed flow method:
 
436
                True means the peer will start sending or continue
 
437
                to send content frames; False means it will not.
 
438
 
 
439
        """
 
440
        return args.read_bit()
 
441
 
 
442
 
 
443
    def _x_open(self, out_of_band=''):
 
444
        """
 
445
        open a channel for use
 
446
 
 
447
        This method opens a virtual connection (a channel).
 
448
 
 
449
        RULE:
 
450
 
 
451
            This method MUST NOT be called when the channel is already
 
452
            open.
 
453
 
 
454
        PARAMETERS:
 
455
            out_of_band: shortstr
 
456
 
 
457
                out-of-band settings
 
458
 
 
459
                Configures out-of-band transfers on this channel.  The
 
460
                syntax and meaning of this field will be formally
 
461
                defined at a later date.
 
462
 
 
463
        """
 
464
        if self.is_open:
 
465
            return
 
466
 
 
467
        args = AMQPWriter()
 
468
        args.write_shortstr(out_of_band)
 
469
        self._send_method((20, 10), args)
 
470
        return self.wait(allowed_methods=[
 
471
                          (20, 11),    # Channel.open_ok
 
472
                        ])
 
473
 
 
474
 
 
475
    def _open_ok(self, args):
 
476
        """
 
477
        signal that the channel is ready
 
478
 
 
479
        This method signals to the client that the channel is ready
 
480
        for use.
 
481
 
 
482
        """
 
483
        self.is_open = True
 
484
        AMQP_LOGGER.debug('Channel open')
 
485
 
 
486
 
 
487
    #############
 
488
    #
 
489
    #  Access
 
490
    #
 
491
    #
 
492
    # work with access tickets
 
493
    #
 
494
    # The protocol control access to server resources using access
 
495
    # tickets. A client must explicitly request access tickets before
 
496
    # doing work. An access ticket grants a client the right to use a
 
497
    # specific set of resources - called a "realm" - in specific ways.
 
498
    #
 
499
    # GRAMMAR:
 
500
    #
 
501
    #     access              = C:REQUEST S:REQUEST-OK
 
502
    #
 
503
    #
 
504
 
 
505
    def access_request(self, realm, exclusive=False,
 
506
        passive=False, active=False, write=False, read=False):
 
507
        """
 
508
        request an access ticket
 
509
 
 
510
        This method requests an access ticket for an access realm. The
 
511
        server responds by granting the access ticket.  If the client
 
512
        does not have access rights to the requested realm this causes
 
513
        a connection exception.  Access tickets are a per-channel
 
514
        resource.
 
515
 
 
516
        RULE:
 
517
 
 
518
            The realm name MUST start with either "/data" (for
 
519
            application resources) or "/admin" (for server
 
520
            administration resources). If the realm starts with any
 
521
            other path, the server MUST raise a connection exception
 
522
            with reply code 403 (access refused).
 
523
 
 
524
        RULE:
 
525
 
 
526
            The server MUST implement the /data realm and MAY
 
527
            implement the /admin realm.  The mapping of resources to
 
528
            realms is not defined in the protocol - this is a server-
 
529
            side configuration issue.
 
530
 
 
531
        PARAMETERS:
 
532
            realm: shortstr
 
533
 
 
534
                name of requested realm
 
535
 
 
536
                RULE:
 
537
 
 
538
                    If the specified realm is not known to the server,
 
539
                    the server must raise a channel exception with
 
540
                    reply code 402 (invalid path).
 
541
 
 
542
            exclusive: boolean
 
543
 
 
544
                request exclusive access
 
545
 
 
546
                Request exclusive access to the realm. If the server
 
547
                cannot grant this - because there are other active
 
548
                tickets for the realm - it raises a channel exception.
 
549
 
 
550
            passive: boolean
 
551
 
 
552
                request passive access
 
553
 
 
554
                Request message passive access to the specified access
 
555
                realm. Passive access lets a client get information
 
556
                about resources in the realm but not to make any
 
557
                changes to them.
 
558
 
 
559
            active: boolean
 
560
 
 
561
                request active access
 
562
 
 
563
                Request message active access to the specified access
 
564
                realm. Acvtive access lets a client get create and
 
565
                delete resources in the realm.
 
566
 
 
567
            write: boolean
 
568
 
 
569
                request write access
 
570
 
 
571
                Request write access to the specified access realm.
 
572
                Write access lets a client publish messages to all
 
573
                exchanges in the realm.
 
574
 
 
575
            read: boolean
 
576
 
 
577
                request read access
 
578
 
 
579
                Request read access to the specified access realm.
 
580
                Read access lets a client consume messages from queues
 
581
                in the realm.
 
582
 
 
583
        The most recently requested ticket is used as the channel's
 
584
        default ticket for any method that requires a ticket.
 
585
 
 
586
        """
 
587
        args = AMQPWriter()
 
588
        args.write_shortstr(realm)
 
589
        args.write_bit(exclusive)
 
590
        args.write_bit(passive)
 
591
        args.write_bit(active)
 
592
        args.write_bit(write)
 
593
        args.write_bit(read)
 
594
        self._send_method((30, 10), args)
 
595
        return self.wait(allowed_methods=[
 
596
                          (30, 11),    # Channel.access_request_ok
 
597
                        ])
 
598
 
 
599
 
 
600
    def _access_request_ok(self, args):
 
601
        """
 
602
        grant access to server resources
 
603
 
 
604
        This method provides the client with an access ticket. The
 
605
        access ticket is valid within the current channel and for the
 
606
        lifespan of the channel.
 
607
 
 
608
        RULE:
 
609
 
 
610
            The client MUST NOT use access tickets except within the
 
611
            same channel as originally granted.
 
612
 
 
613
        RULE:
 
614
 
 
615
            The server MUST isolate access tickets per channel and
 
616
            treat an attempt by a client to mix these as a connection
 
617
            exception.
 
618
 
 
619
        PARAMETERS:
 
620
            ticket: short
 
621
 
 
622
        """
 
623
        self.default_ticket = args.read_short()
 
624
        return self.default_ticket
 
625
 
 
626
 
 
627
    #############
 
628
    #
 
629
    #  Exchange
 
630
    #
 
631
    #
 
632
    # work with exchanges
 
633
    #
 
634
    # Exchanges match and distribute messages across queues.
 
635
    # Exchanges can be configured in the server or created at runtime.
 
636
    #
 
637
    # GRAMMAR:
 
638
    #
 
639
    #     exchange            = C:DECLARE  S:DECLARE-OK
 
640
    #                         / C:DELETE   S:DELETE-OK
 
641
    #
 
642
    # RULE:
 
643
    #
 
644
    #     The server MUST implement the direct and fanout exchange
 
645
    #     types, and predeclare the corresponding exchanges named
 
646
    #     amq.direct and amq.fanout in each virtual host. The server
 
647
    #     MUST also predeclare a direct exchange to act as the default
 
648
    #     exchange for content Publish methods and for default queue
 
649
    #     bindings.
 
650
    #
 
651
    # RULE:
 
652
    #
 
653
    #     The server SHOULD implement the topic exchange type, and
 
654
    #     predeclare the corresponding exchange named amq.topic in
 
655
    #     each virtual host.
 
656
    #
 
657
    # RULE:
 
658
    #
 
659
    #     The server MAY implement the system exchange type, and
 
660
    #     predeclare the corresponding exchanges named amq.system in
 
661
    #     each virtual host. If the client attempts to bind a queue to
 
662
    #     the system exchange, the server MUST raise a connection
 
663
    #     exception with reply code 507 (not allowed).
 
664
    #
 
665
    # RULE:
 
666
    #
 
667
    #     The default exchange MUST be defined as internal, and be
 
668
    #     inaccessible to the client except by specifying an empty
 
669
    #     exchange name in a content Publish method. That is, the
 
670
    #     server MUST NOT let clients make explicit bindings to this
 
671
    #     exchange.
 
672
    #
 
673
    #
 
674
 
 
675
    def exchange_declare(self, exchange, type, passive=False, durable=False,
 
676
        auto_delete=True, internal=False, nowait=False,
 
677
        arguments=None, ticket=None):
 
678
        """
 
679
        declare exchange, create if needed
 
680
 
 
681
        This method creates an exchange if it does not already exist,
 
682
        and if the exchange exists, verifies that it is of the correct
 
683
        and expected class.
 
684
 
 
685
        RULE:
 
686
 
 
687
            The server SHOULD support a minimum of 16 exchanges per
 
688
            virtual host and ideally, impose no limit except as
 
689
            defined by available resources.
 
690
 
 
691
        PARAMETERS:
 
692
            exchange: shortstr
 
693
 
 
694
                RULE:
 
695
 
 
696
                    Exchange names starting with "amq." are reserved
 
697
                    for predeclared and standardised exchanges.  If
 
698
                    the client attempts to create an exchange starting
 
699
                    with "amq.", the server MUST raise a channel
 
700
                    exception with reply code 403 (access refused).
 
701
 
 
702
            type: shortstr
 
703
 
 
704
                exchange type
 
705
 
 
706
                Each exchange belongs to one of a set of exchange
 
707
                types implemented by the server.  The exchange types
 
708
                define the functionality of the exchange - i.e. how
 
709
                messages are routed through it.  It is not valid or
 
710
                meaningful to attempt to change the type of an
 
711
                existing exchange.
 
712
 
 
713
                RULE:
 
714
 
 
715
                    If the exchange already exists with a different
 
716
                    type, the server MUST raise a connection exception
 
717
                    with a reply code 507 (not allowed).
 
718
 
 
719
                RULE:
 
720
 
 
721
                    If the server does not support the requested
 
722
                    exchange type it MUST raise a connection exception
 
723
                    with a reply code 503 (command invalid).
 
724
 
 
725
            passive: boolean
 
726
 
 
727
                do not create exchange
 
728
 
 
729
                If set, the server will not create the exchange.  The
 
730
                client can use this to check whether an exchange
 
731
                exists without modifying the server state.
 
732
 
 
733
                RULE:
 
734
 
 
735
                    If set, and the exchange does not already exist,
 
736
                    the server MUST raise a channel exception with
 
737
                    reply code 404 (not found).
 
738
 
 
739
            durable: boolean
 
740
 
 
741
                request a durable exchange
 
742
 
 
743
                If set when creating a new exchange, the exchange will
 
744
                be marked as durable.  Durable exchanges remain active
 
745
                when a server restarts. Non-durable exchanges
 
746
                (transient exchanges) are purged if/when a server
 
747
                restarts.
 
748
 
 
749
                RULE:
 
750
 
 
751
                    The server MUST support both durable and transient
 
752
                    exchanges.
 
753
 
 
754
                RULE:
 
755
 
 
756
                    The server MUST ignore the durable field if the
 
757
                    exchange already exists.
 
758
 
 
759
            auto_delete: boolean
 
760
 
 
761
                auto-delete when unused
 
762
 
 
763
                If set, the exchange is deleted when all queues have
 
764
                finished using it.
 
765
 
 
766
                RULE:
 
767
 
 
768
                    The server SHOULD allow for a reasonable delay
 
769
                    between the point when it determines that an
 
770
                    exchange is not being used (or no longer used),
 
771
                    and the point when it deletes the exchange.  At
 
772
                    the least it must allow a client to create an
 
773
                    exchange and then bind a queue to it, with a small
 
774
                    but non-zero delay between these two actions.
 
775
 
 
776
                RULE:
 
777
 
 
778
                    The server MUST ignore the auto-delete field if
 
779
                    the exchange already exists.
 
780
 
 
781
            internal: boolean
 
782
 
 
783
                create internal exchange
 
784
 
 
785
                If set, the exchange may not be used directly by
 
786
                publishers, but only when bound to other exchanges.
 
787
                Internal exchanges are used to construct wiring that
 
788
                is not visible to applications.
 
789
 
 
790
            nowait: boolean
 
791
 
 
792
                do not send a reply method
 
793
 
 
794
                If set, the server will not respond to the method. The
 
795
                client should not wait for a reply method.  If the
 
796
                server could not complete the method it will raise a
 
797
                channel or connection exception.
 
798
 
 
799
            arguments: table
 
800
 
 
801
                arguments for declaration
 
802
 
 
803
                A set of arguments for the declaration. The syntax and
 
804
                semantics of these arguments depends on the server
 
805
                implementation.  This field is ignored if passive is
 
806
                True.
 
807
 
 
808
            ticket: short
 
809
 
 
810
                When a client defines a new exchange, this belongs to
 
811
                the access realm of the ticket used.  All further work
 
812
                done with that exchange must be done with an access
 
813
                ticket for the same realm.
 
814
 
 
815
                RULE:
 
816
 
 
817
                    The client MUST provide a valid access ticket
 
818
                    giving "active" access to the realm in which the
 
819
                    exchange exists or will be created, or "passive"
 
820
                    access if the if-exists flag is set.
 
821
 
 
822
        """
 
823
        if arguments is None:
 
824
            arguments = {}
 
825
 
 
826
        args = AMQPWriter()
 
827
        if ticket is not None:
 
828
            args.write_short(ticket)
 
829
        else:
 
830
            args.write_short(self.default_ticket)
 
831
        args.write_shortstr(exchange)
 
832
        args.write_shortstr(type)
 
833
        args.write_bit(passive)
 
834
        args.write_bit(durable)
 
835
        args.write_bit(auto_delete)
 
836
        args.write_bit(internal)
 
837
        args.write_bit(nowait)
 
838
        args.write_table(arguments)
 
839
        self._send_method((40, 10), args)
 
840
 
 
841
        if not nowait:
 
842
            return self.wait(allowed_methods=[
 
843
                              (40, 11),    # Channel.exchange_declare_ok
 
844
                            ])
 
845
 
 
846
 
 
847
    def _exchange_declare_ok(self, args):
 
848
        """
 
849
        confirms an exchange declaration
 
850
 
 
851
        This method confirms a Declare method and confirms the name of
 
852
        the exchange, essential for automatically-named exchanges.
 
853
 
 
854
        """
 
855
        pass
 
856
 
 
857
 
 
858
    def exchange_delete(self, exchange, if_unused=False,
 
859
        nowait=False, ticket=None):
 
860
        """
 
861
        delete an exchange
 
862
 
 
863
        This method deletes an exchange.  When an exchange is deleted
 
864
        all queue bindings on the exchange are cancelled.
 
865
 
 
866
        PARAMETERS:
 
867
            exchange: shortstr
 
868
 
 
869
                RULE:
 
870
 
 
871
                    The exchange MUST exist. Attempting to delete a
 
872
                    non-existing exchange causes a channel exception.
 
873
 
 
874
            if_unused: boolean
 
875
 
 
876
                delete only if unused
 
877
 
 
878
                If set, the server will only delete the exchange if it
 
879
                has no queue bindings. If the exchange has queue
 
880
                bindings the server does not delete it but raises a
 
881
                channel exception instead.
 
882
 
 
883
                RULE:
 
884
 
 
885
                    If set, the server SHOULD delete the exchange but
 
886
                    only if it has no queue bindings.
 
887
 
 
888
                RULE:
 
889
 
 
890
                    If set, the server SHOULD raise a channel
 
891
                    exception if the exchange is in use.
 
892
 
 
893
            nowait: boolean
 
894
 
 
895
                do not send a reply method
 
896
 
 
897
                If set, the server will not respond to the method. The
 
898
                client should not wait for a reply method.  If the
 
899
                server could not complete the method it will raise a
 
900
                channel or connection exception.
 
901
 
 
902
            ticket: short
 
903
 
 
904
                RULE:
 
905
 
 
906
                    The client MUST provide a valid access ticket
 
907
                    giving "active" access rights to the exchange's
 
908
                    access realm.
 
909
 
 
910
        """
 
911
        args = AMQPWriter()
 
912
        if ticket is not None:
 
913
            args.write_short(ticket)
 
914
        else:
 
915
            args.write_short(self.default_ticket)
 
916
        args.write_shortstr(exchange)
 
917
        args.write_bit(if_unused)
 
918
        args.write_bit(nowait)
 
919
        self._send_method((40, 20), args)
 
920
 
 
921
        if not nowait:
 
922
            return self.wait(allowed_methods=[
 
923
                              (40, 21),    # Channel.exchange_delete_ok
 
924
                            ])
 
925
 
 
926
 
 
927
    def _exchange_delete_ok(self, args):
 
928
        """
 
929
        confirm deletion of an exchange
 
930
 
 
931
        This method confirms the deletion of an exchange.
 
932
 
 
933
        """
 
934
        pass
 
935
 
 
936
 
 
937
    #############
 
938
    #
 
939
    #  Queue
 
940
    #
 
941
    #
 
942
    # work with queues
 
943
    #
 
944
    # Queues store and forward messages.  Queues can be configured in
 
945
    # the server or created at runtime.  Queues must be attached to at
 
946
    # least one exchange in order to receive messages from publishers.
 
947
    #
 
948
    # GRAMMAR:
 
949
    #
 
950
    #     queue               = C:DECLARE  S:DECLARE-OK
 
951
    #                         / C:BIND     S:BIND-OK
 
952
    #                         / C:PURGE    S:PURGE-OK
 
953
    #                         / C:DELETE   S:DELETE-OK
 
954
    #
 
955
    # RULE:
 
956
    #
 
957
    #     A server MUST allow any content class to be sent to any
 
958
    #     queue, in any mix, and queue and delivery these content
 
959
    #     classes independently. Note that all methods that fetch
 
960
    #     content off queues are specific to a given content class.
 
961
    #
 
962
    #
 
963
 
 
964
    def queue_bind(self, queue, exchange, routing_key='',
 
965
        nowait=False, arguments=None, ticket=None):
 
966
        """
 
967
        bind queue to an exchange
 
968
 
 
969
        This method binds a queue to an exchange.  Until a queue is
 
970
        bound it will not receive any messages.  In a classic
 
971
        messaging model, store-and-forward queues are bound to a dest
 
972
        exchange and subscription queues are bound to a dest_wild
 
973
        exchange.
 
974
 
 
975
        RULE:
 
976
 
 
977
            A server MUST allow ignore duplicate bindings - that is,
 
978
            two or more bind methods for a specific queue, with
 
979
            identical arguments - without treating these as an error.
 
980
 
 
981
        RULE:
 
982
 
 
983
            If a bind fails, the server MUST raise a connection
 
984
            exception.
 
985
 
 
986
        RULE:
 
987
 
 
988
            The server MUST NOT allow a durable queue to bind to a
 
989
            transient exchange. If the client attempts this the server
 
990
            MUST raise a channel exception.
 
991
 
 
992
        RULE:
 
993
 
 
994
            Bindings for durable queues are automatically durable and
 
995
            the server SHOULD restore such bindings after a server
 
996
            restart.
 
997
 
 
998
        RULE:
 
999
 
 
1000
            If the client attempts to an exchange that was declared as
 
1001
            internal, the server MUST raise a connection exception
 
1002
            with reply code 530 (not allowed).
 
1003
 
 
1004
        RULE:
 
1005
 
 
1006
            The server SHOULD support at least 4 bindings per queue,
 
1007
            and ideally, impose no limit except as defined by
 
1008
            available resources.
 
1009
 
 
1010
        PARAMETERS:
 
1011
            queue: shortstr
 
1012
 
 
1013
                Specifies the name of the queue to bind.  If the queue
 
1014
                name is empty, refers to the current queue for the
 
1015
                channel, which is the last declared queue.
 
1016
 
 
1017
                RULE:
 
1018
 
 
1019
                    If the client did not previously declare a queue,
 
1020
                    and the queue name in this method is empty, the
 
1021
                    server MUST raise a connection exception with
 
1022
                    reply code 530 (not allowed).
 
1023
 
 
1024
                RULE:
 
1025
 
 
1026
                    If the queue does not exist the server MUST raise
 
1027
                    a channel exception with reply code 404 (not
 
1028
                    found).
 
1029
 
 
1030
            exchange: shortstr
 
1031
 
 
1032
                The name of the exchange to bind to.
 
1033
 
 
1034
                RULE:
 
1035
 
 
1036
                    If the exchange does not exist the server MUST
 
1037
                    raise a channel exception with reply code 404 (not
 
1038
                    found).
 
1039
 
 
1040
            routing_key: shortstr
 
1041
 
 
1042
                message routing key
 
1043
 
 
1044
                Specifies the routing key for the binding.  The
 
1045
                routing key is used for routing messages depending on
 
1046
                the exchange configuration. Not all exchanges use a
 
1047
                routing key - refer to the specific exchange
 
1048
                documentation.  If the routing key is empty and the
 
1049
                queue name is empty, the routing key will be the
 
1050
                current queue for the channel, which is the last
 
1051
                declared queue.
 
1052
 
 
1053
            nowait: boolean
 
1054
 
 
1055
                do not send a reply method
 
1056
 
 
1057
                If set, the server will not respond to the method. The
 
1058
                client should not wait for a reply method.  If the
 
1059
                server could not complete the method it will raise a
 
1060
                channel or connection exception.
 
1061
 
 
1062
            arguments: table
 
1063
 
 
1064
                arguments for binding
 
1065
 
 
1066
                A set of arguments for the binding.  The syntax and
 
1067
                semantics of these arguments depends on the exchange
 
1068
                class.
 
1069
 
 
1070
            ticket: short
 
1071
 
 
1072
                The client provides a valid access ticket giving
 
1073
                "active" access rights to the queue's access realm.
 
1074
 
 
1075
        """
 
1076
        if arguments is None:
 
1077
            arguments = {}
 
1078
 
 
1079
        args = AMQPWriter()
 
1080
        if ticket is not None:
 
1081
            args.write_short(ticket)
 
1082
        else:
 
1083
            args.write_short(self.default_ticket)
 
1084
        args.write_shortstr(queue)
 
1085
        args.write_shortstr(exchange)
 
1086
        args.write_shortstr(routing_key)
 
1087
        args.write_bit(nowait)
 
1088
        args.write_table(arguments)
 
1089
        self._send_method((50, 20), args)
 
1090
 
 
1091
        if not nowait:
 
1092
            return self.wait(allowed_methods=[
 
1093
                              (50, 21),    # Channel.queue_bind_ok
 
1094
                            ])
 
1095
 
 
1096
 
 
1097
    def _queue_bind_ok(self, args):
 
1098
        """
 
1099
        confirm bind successful
 
1100
 
 
1101
        This method confirms that the bind was successful.
 
1102
 
 
1103
        """
 
1104
        pass
 
1105
 
 
1106
 
 
1107
    def queue_declare(self, queue='', passive=False, durable=False,
 
1108
        exclusive=False, auto_delete=True, nowait=False,
 
1109
        arguments=None, ticket=None):
 
1110
        """
 
1111
        declare queue, create if needed
 
1112
 
 
1113
        This method creates or checks a queue.  When creating a new
 
1114
        queue the client can specify various properties that control
 
1115
        the durability of the queue and its contents, and the level of
 
1116
        sharing for the queue.
 
1117
 
 
1118
        RULE:
 
1119
 
 
1120
            The server MUST create a default binding for a newly-
 
1121
            created queue to the default exchange, which is an
 
1122
            exchange of type 'direct'.
 
1123
 
 
1124
        RULE:
 
1125
 
 
1126
            The server SHOULD support a minimum of 256 queues per
 
1127
            virtual host and ideally, impose no limit except as
 
1128
            defined by available resources.
 
1129
 
 
1130
        PARAMETERS:
 
1131
            queue: shortstr
 
1132
 
 
1133
                RULE:
 
1134
 
 
1135
                    The queue name MAY be empty, in which case the
 
1136
                    server MUST create a new queue with a unique
 
1137
                    generated name and return this to the client in
 
1138
                    the Declare-Ok method.
 
1139
 
 
1140
                RULE:
 
1141
 
 
1142
                    Queue names starting with "amq." are reserved for
 
1143
                    predeclared and standardised server queues.  If
 
1144
                    the queue name starts with "amq." and the passive
 
1145
                    option is False, the server MUST raise a connection
 
1146
                    exception with reply code 403 (access refused).
 
1147
 
 
1148
            passive: boolean
 
1149
 
 
1150
                do not create queue
 
1151
 
 
1152
                If set, the server will not create the queue.  The
 
1153
                client can use this to check whether a queue exists
 
1154
                without modifying the server state.
 
1155
 
 
1156
                RULE:
 
1157
 
 
1158
                    If set, and the queue does not already exist, the
 
1159
                    server MUST respond with a reply code 404 (not
 
1160
                    found) and raise a channel exception.
 
1161
 
 
1162
            durable: boolean
 
1163
 
 
1164
                request a durable queue
 
1165
 
 
1166
                If set when creating a new queue, the queue will be
 
1167
                marked as durable.  Durable queues remain active when
 
1168
                a server restarts. Non-durable queues (transient
 
1169
                queues) are purged if/when a server restarts.  Note
 
1170
                that durable queues do not necessarily hold persistent
 
1171
                messages, although it does not make sense to send
 
1172
                persistent messages to a transient queue.
 
1173
 
 
1174
                RULE:
 
1175
 
 
1176
                    The server MUST recreate the durable queue after a
 
1177
                    restart.
 
1178
 
 
1179
                RULE:
 
1180
 
 
1181
                    The server MUST support both durable and transient
 
1182
                    queues.
 
1183
 
 
1184
                RULE:
 
1185
 
 
1186
                    The server MUST ignore the durable field if the
 
1187
                    queue already exists.
 
1188
 
 
1189
            exclusive: boolean
 
1190
 
 
1191
                request an exclusive queue
 
1192
 
 
1193
                Exclusive queues may only be consumed from by the
 
1194
                current connection. Setting the 'exclusive' flag
 
1195
                always implies 'auto-delete'.
 
1196
 
 
1197
                RULE:
 
1198
 
 
1199
                    The server MUST support both exclusive (private)
 
1200
                    and non-exclusive (shared) queues.
 
1201
 
 
1202
                RULE:
 
1203
 
 
1204
                    The server MUST raise a channel exception if
 
1205
                    'exclusive' is specified and the queue already
 
1206
                    exists and is owned by a different connection.
 
1207
 
 
1208
            auto_delete: boolean
 
1209
 
 
1210
                auto-delete queue when unused
 
1211
 
 
1212
                If set, the queue is deleted when all consumers have
 
1213
                finished using it. Last consumer can be cancelled
 
1214
                either explicitly or because its channel is closed. If
 
1215
                there was no consumer ever on the queue, it won't be
 
1216
                deleted.
 
1217
 
 
1218
                RULE:
 
1219
 
 
1220
                    The server SHOULD allow for a reasonable delay
 
1221
                    between the point when it determines that a queue
 
1222
                    is not being used (or no longer used), and the
 
1223
                    point when it deletes the queue.  At the least it
 
1224
                    must allow a client to create a queue and then
 
1225
                    create a consumer to read from it, with a small
 
1226
                    but non-zero delay between these two actions.  The
 
1227
                    server should equally allow for clients that may
 
1228
                    be disconnected prematurely, and wish to re-
 
1229
                    consume from the same queue without losing
 
1230
                    messages.  We would recommend a configurable
 
1231
                    timeout, with a suitable default value being one
 
1232
                    minute.
 
1233
 
 
1234
                RULE:
 
1235
 
 
1236
                    The server MUST ignore the auto-delete field if
 
1237
                    the queue already exists.
 
1238
 
 
1239
            nowait: boolean
 
1240
 
 
1241
                do not send a reply method
 
1242
 
 
1243
                If set, the server will not respond to the method. The
 
1244
                client should not wait for a reply method.  If the
 
1245
                server could not complete the method it will raise a
 
1246
                channel or connection exception.
 
1247
 
 
1248
            arguments: table
 
1249
 
 
1250
                arguments for declaration
 
1251
 
 
1252
                A set of arguments for the declaration. The syntax and
 
1253
                semantics of these arguments depends on the server
 
1254
                implementation.  This field is ignored if passive is
 
1255
                True.
 
1256
 
 
1257
            ticket: short
 
1258
 
 
1259
                When a client defines a new queue, this belongs to the
 
1260
                access realm of the ticket used.  All further work
 
1261
                done with that queue must be done with an access
 
1262
                ticket for the same realm.
 
1263
 
 
1264
                The client provides a valid access ticket giving
 
1265
                "active" access to the realm in which the queue exists
 
1266
                or will be created, or "passive" access if the if-
 
1267
                exists flag is set.
 
1268
 
 
1269
        Returns a tuple containing 3 items:
 
1270
            the name of the queue (essential for automatically-named queues)
 
1271
            message count
 
1272
            consumer count
 
1273
 
 
1274
        """
 
1275
        if arguments is None:
 
1276
            arguments = {}
 
1277
 
 
1278
        args = AMQPWriter()
 
1279
        if ticket is not None:
 
1280
            args.write_short(ticket)
 
1281
        else:
 
1282
            args.write_short(self.default_ticket)
 
1283
        args.write_shortstr(queue)
 
1284
        args.write_bit(passive)
 
1285
        args.write_bit(durable)
 
1286
        args.write_bit(exclusive)
 
1287
        args.write_bit(auto_delete)
 
1288
        args.write_bit(nowait)
 
1289
        args.write_table(arguments)
 
1290
        self._send_method((50, 10), args)
 
1291
 
 
1292
        if not nowait:
 
1293
            return self.wait(allowed_methods=[
 
1294
                              (50, 11),    # Channel.queue_declare_ok
 
1295
                            ])
 
1296
 
 
1297
 
 
1298
    def _queue_declare_ok(self, args):
 
1299
        """
 
1300
        confirms a queue definition
 
1301
 
 
1302
        This method confirms a Declare method and confirms the name of
 
1303
        the queue, essential for automatically-named queues.
 
1304
 
 
1305
        PARAMETERS:
 
1306
            queue: shortstr
 
1307
 
 
1308
                Reports the name of the queue. If the server generated
 
1309
                a queue name, this field contains that name.
 
1310
 
 
1311
            message_count: long
 
1312
 
 
1313
                number of messages in queue
 
1314
 
 
1315
                Reports the number of messages in the queue, which
 
1316
                will be zero for newly-created queues.
 
1317
 
 
1318
            consumer_count: long
 
1319
 
 
1320
                number of consumers
 
1321
 
 
1322
                Reports the number of active consumers for the queue.
 
1323
                Note that consumers can suspend activity
 
1324
                (Channel.Flow) in which case they do not appear in
 
1325
                this count.
 
1326
 
 
1327
        """
 
1328
        queue = args.read_shortstr()
 
1329
        message_count = args.read_long()
 
1330
        consumer_count = args.read_long()
 
1331
 
 
1332
        return queue, message_count, consumer_count
 
1333
 
 
1334
 
 
1335
    def queue_delete(self, queue='', if_unused=False, if_empty=False,
 
1336
        nowait=False, ticket=None):
 
1337
        """
 
1338
        delete a queue
 
1339
 
 
1340
        This method deletes a queue.  When a queue is deleted any
 
1341
        pending messages are sent to a dead-letter queue if this is
 
1342
        defined in the server configuration, and all consumers on the
 
1343
        queue are cancelled.
 
1344
 
 
1345
        RULE:
 
1346
 
 
1347
            The server SHOULD use a dead-letter queue to hold messages
 
1348
            that were pending on a deleted queue, and MAY provide
 
1349
            facilities for a system administrator to move these
 
1350
            messages back to an active queue.
 
1351
 
 
1352
        PARAMETERS:
 
1353
            queue: shortstr
 
1354
 
 
1355
                Specifies the name of the queue to delete. If the
 
1356
                queue name is empty, refers to the current queue for
 
1357
                the channel, which is the last declared queue.
 
1358
 
 
1359
                RULE:
 
1360
 
 
1361
                    If the client did not previously declare a queue,
 
1362
                    and the queue name in this method is empty, the
 
1363
                    server MUST raise a connection exception with
 
1364
                    reply code 530 (not allowed).
 
1365
 
 
1366
                RULE:
 
1367
 
 
1368
                    The queue must exist. Attempting to delete a non-
 
1369
                    existing queue causes a channel exception.
 
1370
 
 
1371
            if_unused: boolean
 
1372
 
 
1373
                delete only if unused
 
1374
 
 
1375
                If set, the server will only delete the queue if it
 
1376
                has no consumers. If the queue has consumers the
 
1377
                server does does not delete it but raises a channel
 
1378
                exception instead.
 
1379
 
 
1380
                RULE:
 
1381
 
 
1382
                    The server MUST respect the if-unused flag when
 
1383
                    deleting a queue.
 
1384
 
 
1385
            if_empty: boolean
 
1386
 
 
1387
                delete only if empty
 
1388
 
 
1389
                If set, the server will only delete the queue if it
 
1390
                has no messages. If the queue is not empty the server
 
1391
                raises a channel exception.
 
1392
 
 
1393
            nowait: boolean
 
1394
 
 
1395
                do not send a reply method
 
1396
 
 
1397
                If set, the server will not respond to the method. The
 
1398
                client should not wait for a reply method.  If the
 
1399
                server could not complete the method it will raise a
 
1400
                channel or connection exception.
 
1401
 
 
1402
            ticket: short
 
1403
 
 
1404
                The client provides a valid access ticket giving
 
1405
                "active" access rights to the queue's access realm.
 
1406
 
 
1407
        """
 
1408
        args = AMQPWriter()
 
1409
        if ticket is not None:
 
1410
            args.write_short(ticket)
 
1411
        else:
 
1412
            args.write_short(self.default_ticket)
 
1413
 
 
1414
        args.write_shortstr(queue)
 
1415
        args.write_bit(if_unused)
 
1416
        args.write_bit(if_empty)
 
1417
        args.write_bit(nowait)
 
1418
        self._send_method((50, 40), args)
 
1419
 
 
1420
        if not nowait:
 
1421
            return self.wait(allowed_methods=[
 
1422
                              (50, 41),    # Channel.queue_delete_ok
 
1423
                            ])
 
1424
 
 
1425
 
 
1426
    def _queue_delete_ok(self, args):
 
1427
        """
 
1428
        confirm deletion of a queue
 
1429
 
 
1430
        This method confirms the deletion of a queue.
 
1431
 
 
1432
        PARAMETERS:
 
1433
            message_count: long
 
1434
 
 
1435
                number of messages purged
 
1436
 
 
1437
                Reports the number of messages purged.
 
1438
 
 
1439
        """
 
1440
        return args.read_long()
 
1441
 
 
1442
 
 
1443
    def queue_purge(self, queue='', nowait=False, ticket=None):
 
1444
        """
 
1445
        purge a queue
 
1446
 
 
1447
        This method removes all messages from a queue.  It does not
 
1448
        cancel consumers.  Purged messages are deleted without any
 
1449
        formal "undo" mechanism.
 
1450
 
 
1451
        RULE:
 
1452
 
 
1453
            A call to purge MUST result in an empty queue.
 
1454
 
 
1455
        RULE:
 
1456
 
 
1457
            On transacted channels the server MUST not purge messages
 
1458
            that have already been sent to a client but not yet
 
1459
            acknowledged.
 
1460
 
 
1461
        RULE:
 
1462
 
 
1463
            The server MAY implement a purge queue or log that allows
 
1464
            system administrators to recover accidentally-purged
 
1465
            messages.  The server SHOULD NOT keep purged messages in
 
1466
            the same storage spaces as the live messages since the
 
1467
            volumes of purged messages may get very large.
 
1468
 
 
1469
        PARAMETERS:
 
1470
            queue: shortstr
 
1471
 
 
1472
                Specifies the name of the queue to purge.  If the
 
1473
                queue name is empty, refers to the current queue for
 
1474
                the channel, which is the last declared queue.
 
1475
 
 
1476
                RULE:
 
1477
 
 
1478
                    If the client did not previously declare a queue,
 
1479
                    and the queue name in this method is empty, the
 
1480
                    server MUST raise a connection exception with
 
1481
                    reply code 530 (not allowed).
 
1482
 
 
1483
                RULE:
 
1484
 
 
1485
                    The queue must exist. Attempting to purge a non-
 
1486
                    existing queue causes a channel exception.
 
1487
 
 
1488
            nowait: boolean
 
1489
 
 
1490
                do not send a reply method
 
1491
 
 
1492
                If set, the server will not respond to the method. The
 
1493
                client should not wait for a reply method.  If the
 
1494
                server could not complete the method it will raise a
 
1495
                channel or connection exception.
 
1496
 
 
1497
            ticket: short
 
1498
 
 
1499
                The access ticket must be for the access realm that
 
1500
                holds the queue.
 
1501
 
 
1502
                RULE:
 
1503
 
 
1504
                    The client MUST provide a valid access ticket
 
1505
                    giving "read" access rights to the queue's access
 
1506
                    realm.  Note that purging a queue is equivalent to
 
1507
                    reading all messages and discarding them.
 
1508
 
 
1509
        if nowait is False, returns a message_count
 
1510
 
 
1511
        """
 
1512
        args = AMQPWriter()
 
1513
        if ticket is not None:
 
1514
            args.write_short(ticket)
 
1515
        else:
 
1516
            args.write_short(self.default_ticket)
 
1517
        args.write_shortstr(queue)
 
1518
        args.write_bit(nowait)
 
1519
        self._send_method((50, 30), args)
 
1520
 
 
1521
        if not nowait:
 
1522
            return self.wait(allowed_methods=[
 
1523
                              (50, 31),    # Channel.queue_purge_ok
 
1524
                            ])
 
1525
 
 
1526
 
 
1527
    def _queue_purge_ok(self, args):
 
1528
        """
 
1529
        confirms a queue purge
 
1530
 
 
1531
        This method confirms the purge of a queue.
 
1532
 
 
1533
        PARAMETERS:
 
1534
            message_count: long
 
1535
 
 
1536
                number of messages purged
 
1537
 
 
1538
                Reports the number of messages purged.
 
1539
 
 
1540
        """
 
1541
        return args.read_long()
 
1542
 
 
1543
 
 
1544
    #############
 
1545
    #
 
1546
    #  Basic
 
1547
    #
 
1548
    #
 
1549
    # work with basic content
 
1550
    #
 
1551
    # The Basic class provides methods that support an industry-
 
1552
    # standard messaging model.
 
1553
    #
 
1554
    # GRAMMAR:
 
1555
    #
 
1556
    #     basic               = C:QOS S:QOS-OK
 
1557
    #                         / C:CONSUME S:CONSUME-OK
 
1558
    #                         / C:CANCEL S:CANCEL-OK
 
1559
    #                         / C:PUBLISH content
 
1560
    #                         / S:RETURN content
 
1561
    #                         / S:DELIVER content
 
1562
    #                         / C:GET ( S:GET-OK content / S:GET-EMPTY )
 
1563
    #                         / C:ACK
 
1564
    #                         / C:REJECT
 
1565
    #
 
1566
    # RULE:
 
1567
    #
 
1568
    #     The server SHOULD respect the persistent property of basic
 
1569
    #     messages and SHOULD make a best-effort to hold persistent
 
1570
    #     basic messages on a reliable storage mechanism.
 
1571
    #
 
1572
    # RULE:
 
1573
    #
 
1574
    #     The server MUST NOT discard a persistent basic message in
 
1575
    #     case of a queue overflow. The server MAY use the
 
1576
    #     Channel.Flow method to slow or stop a basic message
 
1577
    #     publisher when necessary.
 
1578
    #
 
1579
    # RULE:
 
1580
    #
 
1581
    #     The server MAY overflow non-persistent basic messages to
 
1582
    #     persistent storage and MAY discard or dead-letter non-
 
1583
    #     persistent basic messages on a priority basis if the queue
 
1584
    #     size exceeds some configured limit.
 
1585
    #
 
1586
    # RULE:
 
1587
    #
 
1588
    #     The server MUST implement at least 2 priority levels for
 
1589
    #     basic messages, where priorities 0-4 and 5-9 are treated as
 
1590
    #     two distinct levels. The server MAY implement up to 10
 
1591
    #     priority levels.
 
1592
    #
 
1593
    # RULE:
 
1594
    #
 
1595
    #     The server MUST deliver messages of the same priority in
 
1596
    #     order irrespective of their individual persistence.
 
1597
    #
 
1598
    # RULE:
 
1599
    #
 
1600
    #     The server MUST support both automatic and explicit
 
1601
    #     acknowledgements on Basic content.
 
1602
    #
 
1603
 
 
1604
    def basic_ack(self, delivery_tag, multiple=False):
 
1605
        """
 
1606
        acknowledge one or more messages
 
1607
 
 
1608
        This method acknowledges one or more messages delivered via
 
1609
        the Deliver or Get-Ok methods.  The client can ask to confirm
 
1610
        a single message or a set of messages up to and including a
 
1611
        specific message.
 
1612
 
 
1613
        PARAMETERS:
 
1614
            delivery_tag: longlong
 
1615
 
 
1616
                server-assigned delivery tag
 
1617
 
 
1618
                The server-assigned and channel-specific delivery tag
 
1619
 
 
1620
                RULE:
 
1621
 
 
1622
                    The delivery tag is valid only within the channel
 
1623
                    from which the message was received.  I.e. a client
 
1624
                    MUST NOT receive a message on one channel and then
 
1625
                    acknowledge it on another.
 
1626
 
 
1627
                RULE:
 
1628
 
 
1629
                    The server MUST NOT use a zero value for delivery
 
1630
                    tags.  Zero is reserved for client use, meaning "all
 
1631
                    messages so far received".
 
1632
 
 
1633
            multiple: boolean
 
1634
 
 
1635
                acknowledge multiple messages
 
1636
 
 
1637
                If set to True, the delivery tag is treated as "up to
 
1638
                and including", so that the client can acknowledge
 
1639
                multiple messages with a single method.  If set to
 
1640
                False, the delivery tag refers to a single message.
 
1641
                If the multiple field is True, and the delivery tag
 
1642
                is zero, tells the server to acknowledge all
 
1643
                outstanding mesages.
 
1644
 
 
1645
                RULE:
 
1646
 
 
1647
                    The server MUST validate that a non-zero delivery-
 
1648
                    tag refers to an delivered message, and raise a
 
1649
                    channel exception if this is not the case.
 
1650
 
 
1651
        """
 
1652
        args = AMQPWriter()
 
1653
        args.write_longlong(delivery_tag)
 
1654
        args.write_bit(multiple)
 
1655
        self._send_method((60, 80), args)
 
1656
 
 
1657
 
 
1658
    def basic_cancel(self, consumer_tag, nowait=False):
 
1659
        """
 
1660
        end a queue consumer
 
1661
 
 
1662
        This method cancels a consumer. This does not affect already
 
1663
        delivered messages, but it does mean the server will not send
 
1664
        any more messages for that consumer.  The client may receive
 
1665
        an abitrary number of messages in between sending the cancel
 
1666
        method and receiving the cancel-ok reply.
 
1667
 
 
1668
        RULE:
 
1669
 
 
1670
            If the queue no longer exists when the client sends a
 
1671
            cancel command, or the consumer has been cancelled for
 
1672
            other reasons, this command has no effect.
 
1673
 
 
1674
        PARAMETERS:
 
1675
            consumer_tag: shortstr
 
1676
 
 
1677
                consumer tag
 
1678
 
 
1679
                Identifier for the consumer, valid within the current
 
1680
                connection.
 
1681
 
 
1682
                RULE:
 
1683
 
 
1684
                    The consumer tag is valid only within the channel
 
1685
                    from which the consumer was created. I.e. a client
 
1686
                    MUST NOT create a consumer in one channel and then
 
1687
                    use it in another.
 
1688
 
 
1689
            nowait: boolean
 
1690
 
 
1691
                do not send a reply method
 
1692
 
 
1693
                If set, the server will not respond to the method. The
 
1694
                client should not wait for a reply method.  If the
 
1695
                server could not complete the method it will raise a
 
1696
                channel or connection exception.
 
1697
 
 
1698
        """
 
1699
        args = AMQPWriter()
 
1700
        args.write_shortstr(consumer_tag)
 
1701
        args.write_bit(nowait)
 
1702
        self._send_method((60, 30), args)
 
1703
        return self.wait(allowed_methods=[
 
1704
                          (60, 31),    # Channel.basic_cancel_ok
 
1705
                        ])
 
1706
 
 
1707
 
 
1708
    def _basic_cancel_ok(self, args):
 
1709
        """
 
1710
        confirm a cancelled consumer
 
1711
 
 
1712
        This method confirms that the cancellation was completed.
 
1713
 
 
1714
        PARAMETERS:
 
1715
            consumer_tag: shortstr
 
1716
 
 
1717
                consumer tag
 
1718
 
 
1719
                Identifier for the consumer, valid within the current
 
1720
                connection.
 
1721
 
 
1722
                RULE:
 
1723
 
 
1724
                    The consumer tag is valid only within the channel
 
1725
                    from which the consumer was created. I.e. a client
 
1726
                    MUST NOT create a consumer in one channel and then
 
1727
                    use it in another.
 
1728
 
 
1729
        """
 
1730
        consumer_tag = args.read_shortstr()
 
1731
        del self.callbacks[consumer_tag]
 
1732
 
 
1733
 
 
1734
    def basic_consume(self, queue='', consumer_tag='', no_local=False,
 
1735
        no_ack=False, exclusive=False, nowait=False,
 
1736
        callback=None, ticket=None):
 
1737
        """
 
1738
        start a queue consumer
 
1739
 
 
1740
        This method asks the server to start a "consumer", which is a
 
1741
        transient request for messages from a specific queue.
 
1742
        Consumers last as long as the channel they were created on, or
 
1743
        until the client cancels them.
 
1744
 
 
1745
        RULE:
 
1746
 
 
1747
            The server SHOULD support at least 16 consumers per queue,
 
1748
            unless the queue was declared as private, and ideally,
 
1749
            impose no limit except as defined by available resources.
 
1750
 
 
1751
        PARAMETERS:
 
1752
            queue: shortstr
 
1753
 
 
1754
                Specifies the name of the queue to consume from.  If
 
1755
                the queue name is null, refers to the current queue
 
1756
                for the channel, which is the last declared queue.
 
1757
 
 
1758
                RULE:
 
1759
 
 
1760
                    If the client did not previously declare a queue,
 
1761
                    and the queue name in this method is empty, the
 
1762
                    server MUST raise a connection exception with
 
1763
                    reply code 530 (not allowed).
 
1764
 
 
1765
            consumer_tag: shortstr
 
1766
 
 
1767
                Specifies the identifier for the consumer. The
 
1768
                consumer tag is local to a connection, so two clients
 
1769
                can use the same consumer tags. If this field is empty
 
1770
                the server will generate a unique tag.
 
1771
 
 
1772
                RULE:
 
1773
 
 
1774
                    The tag MUST NOT refer to an existing consumer. If
 
1775
                    the client attempts to create two consumers with
 
1776
                    the same non-empty tag the server MUST raise a
 
1777
                    connection exception with reply code 530 (not
 
1778
                    allowed).
 
1779
 
 
1780
            no_local: boolean
 
1781
 
 
1782
                do not deliver own messages
 
1783
 
 
1784
                If the no-local field is set the server will not send
 
1785
                messages to the client that published them.
 
1786
 
 
1787
            no_ack: boolean
 
1788
 
 
1789
                no acknowledgement needed
 
1790
 
 
1791
                If this field is set the server does not expect
 
1792
                acknowledgments for messages.  That is, when a message
 
1793
                is delivered to the client the server automatically and
 
1794
                silently acknowledges it on behalf of the client.  This
 
1795
                functionality increases performance but at the cost of
 
1796
                reliability.  Messages can get lost if a client dies
 
1797
                before it can deliver them to the application.
 
1798
 
 
1799
            exclusive: boolean
 
1800
 
 
1801
                request exclusive access
 
1802
 
 
1803
                Request exclusive consumer access, meaning only this
 
1804
                consumer can access the queue.
 
1805
 
 
1806
                RULE:
 
1807
 
 
1808
                    If the server cannot grant exclusive access to the
 
1809
                    queue when asked, - because there are other
 
1810
                    consumers active - it MUST raise a channel
 
1811
                    exception with return code 403 (access refused).
 
1812
 
 
1813
            nowait: boolean
 
1814
 
 
1815
                do not send a reply method
 
1816
 
 
1817
                If set, the server will not respond to the method. The
 
1818
                client should not wait for a reply method.  If the
 
1819
                server could not complete the method it will raise a
 
1820
                channel or connection exception.
 
1821
 
 
1822
            callback: Python callable
 
1823
 
 
1824
                function/method called with each delivered message
 
1825
 
 
1826
                For each message delivered by the broker, the
 
1827
                callable will be called with a Message object
 
1828
                as the single argument.  If no callable is specified,
 
1829
                messages are quietly discarded, no_ack should probably
 
1830
                be set to True in that case.
 
1831
 
 
1832
            ticket: short
 
1833
 
 
1834
                RULE:
 
1835
 
 
1836
                    The client MUST provide a valid access ticket
 
1837
                    giving "read" access rights to the realm for the
 
1838
                    queue.
 
1839
 
 
1840
        """
 
1841
        args = AMQPWriter()
 
1842
        if ticket is not None:
 
1843
            args.write_short(ticket)
 
1844
        else:
 
1845
            args.write_short(self.default_ticket)
 
1846
        args.write_shortstr(queue)
 
1847
        args.write_shortstr(consumer_tag)
 
1848
        args.write_bit(no_local)
 
1849
        args.write_bit(no_ack)
 
1850
        args.write_bit(exclusive)
 
1851
        args.write_bit(nowait)
 
1852
        self._send_method((60, 20), args)
 
1853
 
 
1854
        if not nowait:
 
1855
            consumer_tag = self.wait(allowed_methods=[
 
1856
                              (60, 21),    # Channel.basic_consume_ok
 
1857
                            ])
 
1858
 
 
1859
        self.callbacks[consumer_tag] = callback
 
1860
 
 
1861
        return consumer_tag
 
1862
 
 
1863
 
 
1864
    def _basic_consume_ok(self, args):
 
1865
        """
 
1866
        confirm a new consumer
 
1867
 
 
1868
        The server provides the client with a consumer tag, which is
 
1869
        used by the client for methods called on the consumer at a
 
1870
        later stage.
 
1871
 
 
1872
        PARAMETERS:
 
1873
            consumer_tag: shortstr
 
1874
 
 
1875
                Holds the consumer tag specified by the client or
 
1876
                provided by the server.
 
1877
 
 
1878
        """
 
1879
        return args.read_shortstr()
 
1880
 
 
1881
 
 
1882
    def _basic_deliver(self, args, msg):
 
1883
        """
 
1884
        notify the client of a consumer message
 
1885
 
 
1886
        This method delivers a message to the client, via a consumer.
 
1887
        In the asynchronous message delivery model, the client starts
 
1888
        a consumer using the Consume method, then the server responds
 
1889
        with Deliver methods as and when messages arrive for that
 
1890
        consumer.
 
1891
 
 
1892
        RULE:
 
1893
 
 
1894
            The server SHOULD track the number of times a message has
 
1895
            been delivered to clients and when a message is
 
1896
            redelivered a certain number of times - e.g. 5 times -
 
1897
            without being acknowledged, the server SHOULD consider the
 
1898
            message to be unprocessable (possibly causing client
 
1899
            applications to abort), and move the message to a dead
 
1900
            letter queue.
 
1901
 
 
1902
        PARAMETERS:
 
1903
            consumer_tag: shortstr
 
1904
 
 
1905
                consumer tag
 
1906
 
 
1907
                Identifier for the consumer, valid within the current
 
1908
                connection.
 
1909
 
 
1910
                RULE:
 
1911
 
 
1912
                    The consumer tag is valid only within the channel
 
1913
                    from which the consumer was created. I.e. a client
 
1914
                    MUST NOT create a consumer in one channel and then
 
1915
                    use it in another.
 
1916
 
 
1917
            delivery_tag: longlong
 
1918
 
 
1919
                server-assigned delivery tag
 
1920
 
 
1921
                The server-assigned and channel-specific delivery tag
 
1922
 
 
1923
                RULE:
 
1924
 
 
1925
                    The delivery tag is valid only within the channel
 
1926
                    from which the message was received.  I.e. a client
 
1927
                    MUST NOT receive a message on one channel and then
 
1928
                    acknowledge it on another.
 
1929
 
 
1930
                RULE:
 
1931
 
 
1932
                    The server MUST NOT use a zero value for delivery
 
1933
                    tags.  Zero is reserved for client use, meaning "all
 
1934
                    messages so far received".
 
1935
 
 
1936
            redelivered: boolean
 
1937
 
 
1938
                message is being redelivered
 
1939
 
 
1940
                This indicates that the message has been previously
 
1941
                delivered to this or another client.
 
1942
 
 
1943
            exchange: shortstr
 
1944
 
 
1945
                Specifies the name of the exchange that the message
 
1946
                was originally published to.
 
1947
 
 
1948
            routing_key: shortstr
 
1949
 
 
1950
                Message routing key
 
1951
 
 
1952
                Specifies the routing key name specified when the
 
1953
                message was published.
 
1954
 
 
1955
        """
 
1956
        consumer_tag = args.read_shortstr()
 
1957
        delivery_tag = args.read_longlong()
 
1958
        redelivered = args.read_bit()
 
1959
        exchange = args.read_shortstr()
 
1960
        routing_key = args.read_shortstr()
 
1961
 
 
1962
        msg.delivery_info = {
 
1963
            'channel': self,
 
1964
            'consumer_tag': consumer_tag,
 
1965
            'delivery_tag': delivery_tag,
 
1966
            'redelivered': redelivered,
 
1967
            'exchange': exchange,
 
1968
            'routing_key': routing_key,
 
1969
            }
 
1970
 
 
1971
        func = self.callbacks.get(consumer_tag, None)
 
1972
        if func is not None:
 
1973
            func(msg)
 
1974
 
 
1975
 
 
1976
    def basic_get(self, queue='', no_ack=False, ticket=None):
 
1977
        """
 
1978
        direct access to a queue
 
1979
 
 
1980
        This method provides a direct access to the messages in a
 
1981
        queue using a synchronous dialogue that is designed for
 
1982
        specific types of application where synchronous functionality
 
1983
        is more important than performance.
 
1984
 
 
1985
        PARAMETERS:
 
1986
            queue: shortstr
 
1987
 
 
1988
                Specifies the name of the queue to consume from.  If
 
1989
                the queue name is null, refers to the current queue
 
1990
                for the channel, which is the last declared queue.
 
1991
 
 
1992
                RULE:
 
1993
 
 
1994
                    If the client did not previously declare a queue,
 
1995
                    and the queue name in this method is empty, the
 
1996
                    server MUST raise a connection exception with
 
1997
                    reply code 530 (not allowed).
 
1998
 
 
1999
            no_ack: boolean
 
2000
 
 
2001
                no acknowledgement needed
 
2002
 
 
2003
                If this field is set the server does not expect
 
2004
                acknowledgments for messages.  That is, when a message
 
2005
                is delivered to the client the server automatically and
 
2006
                silently acknowledges it on behalf of the client.  This
 
2007
                functionality increases performance but at the cost of
 
2008
                reliability.  Messages can get lost if a client dies
 
2009
                before it can deliver them to the application.
 
2010
 
 
2011
            ticket: short
 
2012
 
 
2013
                RULE:
 
2014
 
 
2015
                    The client MUST provide a valid access ticket
 
2016
                    giving "read" access rights to the realm for the
 
2017
                    queue.
 
2018
 
 
2019
        Non-blocking, returns a message object, or None.
 
2020
 
 
2021
        """
 
2022
        args = AMQPWriter()
 
2023
        if ticket is not None:
 
2024
            args.write_short(ticket)
 
2025
        else:
 
2026
            args.write_short(self.default_ticket)
 
2027
        args.write_shortstr(queue)
 
2028
        args.write_bit(no_ack)
 
2029
        self._send_method((60, 70), args)
 
2030
        return self.wait(allowed_methods=[
 
2031
                          (60, 71),    # Channel.basic_get_ok
 
2032
                          (60, 72),    # Channel.basic_get_empty
 
2033
                        ])
 
2034
 
 
2035
 
 
2036
    def _basic_get_empty(self, args):
 
2037
        """
 
2038
        indicate no messages available
 
2039
 
 
2040
        This method tells the client that the queue has no messages
 
2041
        available for the client.
 
2042
 
 
2043
        PARAMETERS:
 
2044
            cluster_id: shortstr
 
2045
 
 
2046
                Cluster id
 
2047
 
 
2048
                For use by cluster applications, should not be used by
 
2049
                client applications.
 
2050
 
 
2051
        """
 
2052
        cluster_id = args.read_shortstr()
 
2053
 
 
2054
 
 
2055
    def _basic_get_ok(self, args, msg):
 
2056
        """
 
2057
        provide client with a message
 
2058
 
 
2059
        This method delivers a message to the client following a get
 
2060
        method.  A message delivered by 'get-ok' must be acknowledged
 
2061
        unless the no-ack option was set in the get method.
 
2062
 
 
2063
        PARAMETERS:
 
2064
            delivery_tag: longlong
 
2065
 
 
2066
                server-assigned delivery tag
 
2067
 
 
2068
                The server-assigned and channel-specific delivery tag
 
2069
 
 
2070
                RULE:
 
2071
 
 
2072
                    The delivery tag is valid only within the channel
 
2073
                    from which the message was received.  I.e. a client
 
2074
                    MUST NOT receive a message on one channel and then
 
2075
                    acknowledge it on another.
 
2076
 
 
2077
                RULE:
 
2078
 
 
2079
                    The server MUST NOT use a zero value for delivery
 
2080
                    tags.  Zero is reserved for client use, meaning "all
 
2081
                    messages so far received".
 
2082
 
 
2083
            redelivered: boolean
 
2084
 
 
2085
                message is being redelivered
 
2086
 
 
2087
                This indicates that the message has been previously
 
2088
                delivered to this or another client.
 
2089
 
 
2090
            exchange: shortstr
 
2091
 
 
2092
                Specifies the name of the exchange that the message
 
2093
                was originally published to.  If empty, the message
 
2094
                was published to the default exchange.
 
2095
 
 
2096
            routing_key: shortstr
 
2097
 
 
2098
                Message routing key
 
2099
 
 
2100
                Specifies the routing key name specified when the
 
2101
                message was published.
 
2102
 
 
2103
            message_count: long
 
2104
 
 
2105
                number of messages pending
 
2106
 
 
2107
                This field reports the number of messages pending on
 
2108
                the queue, excluding the message being delivered.
 
2109
                Note that this figure is indicative, not reliable, and
 
2110
                can change arbitrarily as messages are added to the
 
2111
                queue and removed by other clients.
 
2112
 
 
2113
        """
 
2114
        delivery_tag = args.read_longlong()
 
2115
        redelivered = args.read_bit()
 
2116
        exchange = args.read_shortstr()
 
2117
        routing_key = args.read_shortstr()
 
2118
        message_count = args.read_long()
 
2119
 
 
2120
        msg.delivery_info = {
 
2121
            'delivery_tag': delivery_tag,
 
2122
            'redelivered': redelivered,
 
2123
            'exchange': exchange,
 
2124
            'routing_key': routing_key,
 
2125
            'message_count': message_count
 
2126
            }
 
2127
 
 
2128
        return msg
 
2129
 
 
2130
 
 
2131
    def basic_publish(self, msg, exchange='', routing_key='',
 
2132
        mandatory=False, immediate=False, ticket=None):
 
2133
        """
 
2134
        publish a message
 
2135
 
 
2136
        This method publishes a message to a specific exchange. The
 
2137
        message will be routed to queues as defined by the exchange
 
2138
        configuration and distributed to any active consumers when the
 
2139
        transaction, if any, is committed.
 
2140
 
 
2141
        PARAMETERS:
 
2142
            exchange: shortstr
 
2143
 
 
2144
                Specifies the name of the exchange to publish to.  The
 
2145
                exchange name can be empty, meaning the default
 
2146
                exchange.  If the exchange name is specified, and that
 
2147
                exchange does not exist, the server will raise a
 
2148
                channel exception.
 
2149
 
 
2150
                RULE:
 
2151
 
 
2152
                    The server MUST accept a blank exchange name to
 
2153
                    mean the default exchange.
 
2154
 
 
2155
                RULE:
 
2156
 
 
2157
                    If the exchange was declared as an internal
 
2158
                    exchange, the server MUST raise a channel
 
2159
                    exception with a reply code 403 (access refused).
 
2160
 
 
2161
                RULE:
 
2162
 
 
2163
                    The exchange MAY refuse basic content in which
 
2164
                    case it MUST raise a channel exception with reply
 
2165
                    code 540 (not implemented).
 
2166
 
 
2167
            routing_key: shortstr
 
2168
 
 
2169
                Message routing key
 
2170
 
 
2171
                Specifies the routing key for the message.  The
 
2172
                routing key is used for routing messages depending on
 
2173
                the exchange configuration.
 
2174
 
 
2175
            mandatory: boolean
 
2176
 
 
2177
                indicate mandatory routing
 
2178
 
 
2179
                This flag tells the server how to react if the message
 
2180
                cannot be routed to a queue.  If this flag is True, the
 
2181
                server will return an unroutable message with a Return
 
2182
                method.  If this flag is False, the server silently
 
2183
                drops the message.
 
2184
 
 
2185
                RULE:
 
2186
 
 
2187
                    The server SHOULD implement the mandatory flag.
 
2188
 
 
2189
            immediate: boolean
 
2190
 
 
2191
                request immediate delivery
 
2192
 
 
2193
                This flag tells the server how to react if the message
 
2194
                cannot be routed to a queue consumer immediately.  If
 
2195
                this flag is set, the server will return an
 
2196
                undeliverable message with a Return method. If this
 
2197
                flag is zero, the server will queue the message, but
 
2198
                with no guarantee that it will ever be consumed.
 
2199
 
 
2200
                RULE:
 
2201
 
 
2202
                    The server SHOULD implement the immediate flag.
 
2203
 
 
2204
            ticket: short
 
2205
 
 
2206
                RULE:
 
2207
 
 
2208
                    The client MUST provide a valid access ticket
 
2209
                    giving "write" access rights to the access realm
 
2210
                    for the exchange.
 
2211
 
 
2212
        """
 
2213
        args = AMQPWriter()
 
2214
        if ticket is not None:
 
2215
            args.write_short(ticket)
 
2216
        else:
 
2217
            args.write_short(self.default_ticket)
 
2218
        args.write_shortstr(exchange)
 
2219
        args.write_shortstr(routing_key)
 
2220
        args.write_bit(mandatory)
 
2221
        args.write_bit(immediate)
 
2222
 
 
2223
        self._send_method((60, 40), args, msg)
 
2224
 
 
2225
 
 
2226
    def basic_qos(self, prefetch_size, prefetch_count, a_global):
 
2227
        """
 
2228
        specify quality of service
 
2229
 
 
2230
        This method requests a specific quality of service.  The QoS
 
2231
        can be specified for the current channel or for all channels
 
2232
        on the connection.  The particular properties and semantics of
 
2233
        a qos method always depend on the content class semantics.
 
2234
        Though the qos method could in principle apply to both peers,
 
2235
        it is currently meaningful only for the server.
 
2236
 
 
2237
        PARAMETERS:
 
2238
            prefetch_size: long
 
2239
 
 
2240
                prefetch window in octets
 
2241
 
 
2242
                The client can request that messages be sent in
 
2243
                advance so that when the client finishes processing a
 
2244
                message, the following message is already held
 
2245
                locally, rather than needing to be sent down the
 
2246
                channel.  Prefetching gives a performance improvement.
 
2247
                This field specifies the prefetch window size in
 
2248
                octets.  The server will send a message in advance if
 
2249
                it is equal to or smaller in size than the available
 
2250
                prefetch size (and also falls into other prefetch
 
2251
                limits). May be set to zero, meaning "no specific
 
2252
                limit", although other prefetch limits may still
 
2253
                apply. The prefetch-size is ignored if the no-ack
 
2254
                option is set.
 
2255
 
 
2256
                RULE:
 
2257
 
 
2258
                    The server MUST ignore this setting when the
 
2259
                    client is not processing any messages - i.e. the
 
2260
                    prefetch size does not limit the transfer of
 
2261
                    single messages to a client, only the sending in
 
2262
                    advance of more messages while the client still
 
2263
                    has one or more unacknowledged messages.
 
2264
 
 
2265
            prefetch_count: short
 
2266
 
 
2267
                prefetch window in messages
 
2268
 
 
2269
                Specifies a prefetch window in terms of whole
 
2270
                messages.  This field may be used in combination with
 
2271
                the prefetch-size field; a message will only be sent
 
2272
                in advance if both prefetch windows (and those at the
 
2273
                channel and connection level) allow it. The prefetch-
 
2274
                count is ignored if the no-ack option is set.
 
2275
 
 
2276
                RULE:
 
2277
 
 
2278
                    The server MAY send less data in advance than
 
2279
                    allowed by the client's specified prefetch windows
 
2280
                    but it MUST NOT send more.
 
2281
 
 
2282
            a_global: boolean
 
2283
 
 
2284
                apply to entire connection
 
2285
 
 
2286
                By default the QoS settings apply to the current
 
2287
                channel only.  If this field is set, they are applied
 
2288
                to the entire connection.
 
2289
 
 
2290
        """
 
2291
        args = AMQPWriter()
 
2292
        args.write_long(prefetch_size)
 
2293
        args.write_short(prefetch_count)
 
2294
        args.write_bit(a_global)
 
2295
        self._send_method((60, 10), args)
 
2296
        return self.wait(allowed_methods=[
 
2297
                          (60, 11),    # Channel.basic_qos_ok
 
2298
                        ])
 
2299
 
 
2300
 
 
2301
    def _basic_qos_ok(self, args):
 
2302
        """
 
2303
        confirm the requested qos
 
2304
 
 
2305
        This method tells the client that the requested QoS levels
 
2306
        could be handled by the server.  The requested QoS applies to
 
2307
        all active consumers until a new QoS is defined.
 
2308
 
 
2309
        """
 
2310
        pass
 
2311
 
 
2312
 
 
2313
    def basic_recover(self, requeue=False):
 
2314
        """
 
2315
        redeliver unacknowledged messages
 
2316
 
 
2317
        This method asks the broker to redeliver all unacknowledged
 
2318
        messages on a specified channel. Zero or more messages may be
 
2319
        redelivered.  This method is only allowed on non-transacted
 
2320
        channels.
 
2321
 
 
2322
        RULE:
 
2323
 
 
2324
            The server MUST set the redelivered flag on all messages
 
2325
            that are resent.
 
2326
 
 
2327
        RULE:
 
2328
 
 
2329
            The server MUST raise a channel exception if this is
 
2330
            called on a transacted channel.
 
2331
 
 
2332
        PARAMETERS:
 
2333
            requeue: boolean
 
2334
 
 
2335
                requeue the message
 
2336
 
 
2337
                If this field is False, the message will be redelivered
 
2338
                to the original recipient.  If this field is True, the
 
2339
                server will attempt to requeue the message,
 
2340
                potentially then delivering it to an alternative
 
2341
                subscriber.
 
2342
 
 
2343
        """
 
2344
        args = AMQPWriter()
 
2345
        args.write_bit(requeue)
 
2346
        self._send_method((60, 100), args)
 
2347
 
 
2348
 
 
2349
    def basic_reject(self, delivery_tag, requeue):
 
2350
        """
 
2351
        reject an incoming message
 
2352
 
 
2353
        This method allows a client to reject a message.  It can be
 
2354
        used to interrupt and cancel large incoming messages, or
 
2355
        return untreatable messages to their original queue.
 
2356
 
 
2357
        RULE:
 
2358
 
 
2359
            The server SHOULD be capable of accepting and process the
 
2360
            Reject method while sending message content with a Deliver
 
2361
            or Get-Ok method.  I.e. the server should read and process
 
2362
            incoming methods while sending output frames.  To cancel a
 
2363
            partially-send content, the server sends a content body
 
2364
            frame of size 1 (i.e. with no data except the frame-end
 
2365
            octet).
 
2366
 
 
2367
        RULE:
 
2368
 
 
2369
            The server SHOULD interpret this method as meaning that
 
2370
            the client is unable to process the message at this time.
 
2371
 
 
2372
        RULE:
 
2373
 
 
2374
            A client MUST NOT use this method as a means of selecting
 
2375
            messages to process.  A rejected message MAY be discarded
 
2376
            or dead-lettered, not necessarily passed to another
 
2377
            client.
 
2378
 
 
2379
        PARAMETERS:
 
2380
            delivery_tag: longlong
 
2381
 
 
2382
                server-assigned delivery tag
 
2383
 
 
2384
                The server-assigned and channel-specific delivery tag
 
2385
 
 
2386
                RULE:
 
2387
 
 
2388
                    The delivery tag is valid only within the channel
 
2389
                    from which the message was received.  I.e. a client
 
2390
                    MUST NOT receive a message on one channel and then
 
2391
                    acknowledge it on another.
 
2392
 
 
2393
                RULE:
 
2394
 
 
2395
                    The server MUST NOT use a zero value for delivery
 
2396
                    tags.  Zero is reserved for client use, meaning "all
 
2397
                    messages so far received".
 
2398
 
 
2399
            requeue: boolean
 
2400
 
 
2401
                requeue the message
 
2402
 
 
2403
                If this field is False, the message will be discarded.
 
2404
                If this field is True, the server will attempt to
 
2405
                requeue the message.
 
2406
 
 
2407
                RULE:
 
2408
 
 
2409
                    The server MUST NOT deliver the message to the
 
2410
                    same client within the context of the current
 
2411
                    channel.  The recommended strategy is to attempt
 
2412
                    to deliver the message to an alternative consumer,
 
2413
                    and if that is not possible, to move the message
 
2414
                    to a dead-letter queue.  The server MAY use more
 
2415
                    sophisticated tracking to hold the message on the
 
2416
                    queue and redeliver it to the same client at a
 
2417
                    later stage.
 
2418
 
 
2419
        """
 
2420
        args = AMQPWriter()
 
2421
        args.write_longlong(delivery_tag)
 
2422
        args.write_bit(requeue)
 
2423
        self._send_method((60, 90), args)
 
2424
 
 
2425
 
 
2426
    def _basic_return(self, args, msg):
 
2427
        """
 
2428
        return a failed message
 
2429
 
 
2430
        This method returns an undeliverable message that was
 
2431
        published with the "immediate" flag set, or an unroutable
 
2432
        message published with the "mandatory" flag set. The reply
 
2433
        code and text provide information about the reason that the
 
2434
        message was undeliverable.
 
2435
 
 
2436
        PARAMETERS:
 
2437
            reply_code: short
 
2438
 
 
2439
                The reply code. The AMQ reply codes are defined in AMQ
 
2440
                RFC 011.
 
2441
 
 
2442
            reply_text: shortstr
 
2443
 
 
2444
                The localised reply text.  This text can be logged as an
 
2445
                aid to resolving issues.
 
2446
 
 
2447
            exchange: shortstr
 
2448
 
 
2449
                Specifies the name of the exchange that the message
 
2450
                was originally published to.
 
2451
 
 
2452
            routing_key: shortstr
 
2453
 
 
2454
                Message routing key
 
2455
 
 
2456
                Specifies the routing key name specified when the
 
2457
                message was published.
 
2458
 
 
2459
        """
 
2460
        reply_code = args.read_short()
 
2461
        reply_text = args.read_shortstr()
 
2462
        exchange = args.read_shortstr()
 
2463
        routing_key = args.read_shortstr()
 
2464
 
 
2465
        self.returned_messages.put(
 
2466
            (reply_code, reply_text, exchange, routing_key, msg)
 
2467
            )
 
2468
 
 
2469
 
 
2470
    #############
 
2471
    #
 
2472
    #  Tx
 
2473
    #
 
2474
    #
 
2475
    # work with standard transactions
 
2476
    #
 
2477
    # Standard transactions provide so-called "1.5 phase commit".  We
 
2478
    # can ensure that work is never lost, but there is a chance of
 
2479
    # confirmations being lost, so that messages may be resent.
 
2480
    # Applications that use standard transactions must be able to
 
2481
    # detect and ignore duplicate messages.
 
2482
    #
 
2483
    # GRAMMAR:
 
2484
    #
 
2485
    #     tx                  = C:SELECT S:SELECT-OK
 
2486
    #                         / C:COMMIT S:COMMIT-OK
 
2487
    #                         / C:ROLLBACK S:ROLLBACK-OK
 
2488
    #
 
2489
    # RULE:
 
2490
    #
 
2491
    #     An client using standard transactions SHOULD be able to
 
2492
    #     track all messages received within a reasonable period, and
 
2493
    #     thus detect and reject duplicates of the same message. It
 
2494
    #     SHOULD NOT pass these to the application layer.
 
2495
    #
 
2496
    #
 
2497
 
 
2498
    def tx_commit(self):
 
2499
        """
 
2500
        commit the current transaction
 
2501
 
 
2502
        This method commits all messages published and acknowledged in
 
2503
        the current transaction.  A new transaction starts immediately
 
2504
        after a commit.
 
2505
 
 
2506
        """
 
2507
        self._send_method((90, 20))
 
2508
        return self.wait(allowed_methods=[
 
2509
                          (90, 21),    # Channel.tx_commit_ok
 
2510
                        ])
 
2511
 
 
2512
 
 
2513
    def _tx_commit_ok(self, args):
 
2514
        """
 
2515
        confirm a successful commit
 
2516
 
 
2517
        This method confirms to the client that the commit succeeded.
 
2518
        Note that if a commit fails, the server raises a channel
 
2519
        exception.
 
2520
 
 
2521
        """
 
2522
        pass
 
2523
 
 
2524
 
 
2525
    def tx_rollback(self):
 
2526
        """
 
2527
        abandon the current transaction
 
2528
 
 
2529
        This method abandons all messages published and acknowledged
 
2530
        in the current transaction.  A new transaction starts
 
2531
        immediately after a rollback.
 
2532
 
 
2533
        """
 
2534
        self._send_method((90, 30))
 
2535
        return self.wait(allowed_methods=[
 
2536
                          (90, 31),    # Channel.tx_rollback_ok
 
2537
                        ])
 
2538
 
 
2539
 
 
2540
    def _tx_rollback_ok(self, args):
 
2541
        """
 
2542
        confirm a successful rollback
 
2543
 
 
2544
        This method confirms to the client that the rollback
 
2545
        succeeded. Note that if an rollback fails, the server raises a
 
2546
        channel exception.
 
2547
 
 
2548
        """
 
2549
        pass
 
2550
 
 
2551
 
 
2552
    def tx_select(self):
 
2553
        """
 
2554
        select standard transaction mode
 
2555
 
 
2556
        This method sets the channel to use standard transactions.
 
2557
        The client must use this method at least once on a channel
 
2558
        before using the Commit or Rollback methods.
 
2559
 
 
2560
        """
 
2561
        self._send_method((90, 10))
 
2562
        return self.wait(allowed_methods=[
 
2563
                          (90, 11),    # Channel.tx_select_ok
 
2564
                        ])
 
2565
 
 
2566
 
 
2567
    def _tx_select_ok(self, args):
 
2568
        """
 
2569
        confirm transaction mode
 
2570
 
 
2571
        This method confirms to the client that the channel was
 
2572
        successfully set to use standard transactions.
 
2573
 
 
2574
        """
 
2575
        pass
 
2576
 
 
2577
 
 
2578
    _METHOD_MAP = {
 
2579
        (20, 11): _open_ok,
 
2580
        (20, 20): _flow,
 
2581
        (20, 21): _flow_ok,
 
2582
        (20, 30): _alert,
 
2583
        (20, 40): _close,
 
2584
        (20, 41): _close_ok,
 
2585
        (30, 11): _access_request_ok,
 
2586
        (40, 11): _exchange_declare_ok,
 
2587
        (40, 21): _exchange_delete_ok,
 
2588
        (50, 11): _queue_declare_ok,
 
2589
        (50, 21): _queue_bind_ok,
 
2590
        (50, 31): _queue_purge_ok,
 
2591
        (50, 41): _queue_delete_ok,
 
2592
        (60, 11): _basic_qos_ok,
 
2593
        (60, 21): _basic_consume_ok,
 
2594
        (60, 31): _basic_cancel_ok,
 
2595
        (60, 50): _basic_return,
 
2596
        (60, 60): _basic_deliver,
 
2597
        (60, 71): _basic_get_ok,
 
2598
        (60, 72): _basic_get_empty,
 
2599
        (90, 11): _tx_select_ok,
 
2600
        (90, 21): _tx_commit_ok,
 
2601
        (90, 31): _tx_rollback_ok,
 
2602
        }