2
Code common to Connection and Channel objects.
5
# Copyright (C) 2007-2008 Barry Pederson <bp@barryp.org>
7
# This library is free software; you can redistribute it and/or
8
# modify it under the terms of the GNU Lesser General Public
9
# License as published by the Free Software Foundation; either
10
# version 2.1 of the License, or (at your option) any later version.
12
# This library is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15
# Lesser General Public License for more details.
17
# You should have received a copy of the GNU Lesser General Public
18
# License along with this library; if not, write to the Free Software
19
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
21
from serialization import AMQPWriter
28
class AbstractChannel(object):
30
Superclass for both the Connection, which is treated
31
as channel 0, and other user-created Channel objects.
33
The subclasses must have a _METHOD_MAP class property, mapping
34
between AMQP method signatures and Python methods.
37
def __init__(self, connection, channel_id):
38
self.connection = connection
39
self.channel_id = channel_id
40
connection.channels[channel_id] = self
41
self.method_queue = [] # Higher level queue for methods
42
self.auto_decode = False
47
Support for Python >= 2.5 'with' statements.
53
def __exit__(self, type, value, traceback):
55
Support for Python >= 2.5 'with' statements.
61
def _send_method(self, method_sig, args='', content=None):
63
Send a method for our channel.
66
if isinstance(args, AMQPWriter):
67
args = args.getvalue()
69
self.connection.method_writer.write_method(self.channel_id,
70
method_sig, args, content)
75
Close this Channel or Connection
78
raise NotImplementedError('Must be overriden in subclass')
82
def wait(self, allowed_methods=None):
84
Wait for a method that matches our allowed_methods parameter (the
85
default value of None means match any method), and dispatch to it.
88
method_sig, args, content = self.connection._wait_method(
89
self.channel_id, allowed_methods)
92
and self.auto_decode \
93
and hasattr(content, 'content_encoding'):
95
content.body = content.body.decode(content.content_encoding)
99
amqp_method = self._METHOD_MAP.get(method_sig, None)
101
if amqp_method is None:
102
raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
105
return amqp_method(self, args)
107
return amqp_method(self, args, content)
111
# Placeholder, the concrete implementations will have to
112
# supply their own versions of _METHOD_MAP