3
Getting a connection to the AMQP server.
6
from amqplib.client_0_8.connection import AMQPConnectionException
7
from carrot.backends import get_backend_cls
11
DEFAULT_CONNECT_TIMEOUT = 5 # seconds
12
SETTING_PREFIX = "BROKER"
13
COMPAT_SETTING_PREFIX = "AMQP"
14
ARG_TO_DJANGO_SETTING = {
17
"password": "PASSWORD",
18
"virtual_host": "VHOST",
21
SETTING_DEPRECATED_FMT = "Setting %s has been renamed to %s and is " \
22
"scheduled for removal in version 1.0."
25
class BrokerConnection(object):
26
"""A network/socket connection to an AMQP message broker.
28
:param hostname: see :attr:`hostname`.
29
:param userid: see :attr:`userid`.
30
:param password: see :attr:`password`.
32
:keyword virtual_host: see :attr:`virtual_host`.
33
:keyword port: see :attr:`port`.
34
:keyword insist: see :attr:`insist`.
35
:keyword connect_timeout: see :attr:`connect_timeout`.
36
:keyword ssl: see :attr:`ssl`.
38
.. attribute:: hostname
40
The hostname to the AMQP server
44
A valid username used to authenticate to the server.
46
.. attribute:: password
48
The password used to authenticate to the server.
50
.. attribute:: virtual_host
52
The name of the virtual host to work with. This virtual host must
53
exist on the server, and the user must have access to it. Consult
54
your brokers manual for help with creating, and mapping
55
users to virtual hosts.
60
The port of the AMQP server. Default is ``5672`` (amqp).
64
Insist on connecting to a server. In a configuration with multiple
65
load-sharing servers, the insist option tells the server that the
66
client is insisting on a connection to the specified server.
69
.. attribute:: connect_timeout
71
The timeout in seconds before we give up connecting to the server.
72
The default is no timeout.
76
Use SSL to connect to the server.
77
The default is ``False``.
79
.. attribute:: backend_cls
81
The messaging backend class used. Defaults to the ``pyamqplib``
88
connect_timeout = DEFAULT_CONNECT_TIMEOUT
93
ConnectionException = AMQPConnectionException
97
"""The host as a hostname/port pair separated by colon."""
98
return ":".join([self.hostname, str(self.port)])
100
def __init__(self, hostname=None, userid=None, password=None,
101
virtual_host=None, port=None, **kwargs):
102
self.hostname = hostname
104
self.password = password
105
self.virtual_host = virtual_host or self.virtual_host
106
self.port = port or self.port
107
self.insist = kwargs.get("insist", self.insist)
108
self.connect_timeout = kwargs.get("connect_timeout",
109
self.connect_timeout)
110
self.ssl = kwargs.get("ssl", self.ssl)
111
self.backend_cls = kwargs.get("backend_cls", None)
113
self._connection = None
116
def connection(self):
117
if self._closed == True:
119
if not self._connection:
120
self._connection = self._establish_connection()
122
return self._connection
127
def __exit__(self, e_type, e_value, e_trace):
129
raise e_type(e_value)
132
def _establish_connection(self):
133
return self.create_backend().establish_connection()
135
def get_backend_cls(self):
136
"""Get the currently used backend class."""
137
backend_cls = self.backend_cls
138
if not backend_cls or isinstance(backend_cls, basestring):
139
backend_cls = get_backend_cls(backend_cls)
142
def create_backend(self):
143
"""Create a new instance of the current backend in
144
:attr:`backend_cls`."""
145
backend_cls = self.get_backend_cls()
146
return backend_cls(connection=self)
148
def get_channel(self):
149
"""Request a new AMQP channel."""
150
return self.connection.channel()
153
"""Establish a connection to the AMQP server."""
155
return self.connection
158
"""Close the currently open connection."""
161
backend = self.create_backend()
162
backend.close_connection(self._connection)
167
# For backwards compatability.
168
AMQPConnection = BrokerConnection
171
def get_django_conninfo():
172
# FIXME can't wait to remove this mess in 1.0 [askh]
174
from django.conf import settings as django_settings
176
ci["backend_cls"] = getattr(django_settings, "CARROT_BACKEND", None)
178
for arg_name, setting_name in ARG_TO_DJANGO_SETTING.items():
179
setting = "%s_%s" % (SETTING_PREFIX, setting_name)
180
compat_setting = "%s_%s" % (COMPAT_SETTING_PREFIX, setting_name)
181
if hasattr(django_settings, setting):
182
ci[arg_name] = getattr(django_settings, setting, None)
183
elif hasattr(django_settings, compat_setting):
184
ci[arg_name] = getattr(django_settings, compat_setting, None)
185
warnings.warn(DeprecationWarning(SETTING_DEPRECATED_FMT % (
186
compat_setting, setting)))
188
if "hostname" not in ci:
189
if hasattr(django_settings, "AMQP_SERVER"):
190
ci["hostname"] = django_settings.AMQP_SERVER
191
warnings.warn(DeprecationWarning(
192
"AMQP_SERVER has been renamed to BROKER_HOST and is"
193
"scheduled for removal in version 1.0."))
198
class DjangoBrokerConnection(BrokerConnection):
199
"""A version of :class:`BrokerConnection` that takes configuration
200
from the Django ``settings.py`` module.
202
:keyword hostname: The hostname of the AMQP server to connect to,
203
if not provided this is taken from ``settings.BROKER_HOST``.
205
:keyword userid: The username of the user to authenticate to the server
206
as. If not provided this is taken from ``settings.BROKER_USER``.
208
:keyword password: The users password. If not provided this is taken
209
from ``settings.BROKER_PASSWORD``.
211
:keyword virtual_host: The name of the virtual host to work with.
212
This virtual host must exist on the server, and the user must
213
have access to it. Consult your brokers manual for help with
214
creating, and mapping users to virtual hosts. If not provided
215
this is taken from ``settings.BROKER_VHOST``.
217
:keyword port: The port the AMQP server is running on. If not provided
218
this is taken from ``settings.BROKER_PORT``, or if that is not set,
219
the default is ``5672`` (amqp).
224
def __init__(self, *args, **kwargs):
225
kwargs = dict(get_django_conninfo(), **kwargs)
226
super(DjangoBrokerConnection, self).__init__(*args, **kwargs)
228
# For backwards compatability.
229
DjangoAMQPConnection = DjangoBrokerConnection