1
# Copyright 2012 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Rabbit messaging."""
6
from __future__ import (
23
from errno import ECONNREFUSED
27
from amqplib import client_0_8 as amqp
28
from django.conf import settings
29
from maasserver.exceptions import NoRabbit
33
"""Connect to AMQP."""
35
return amqp.Connection(
36
host=settings.RABBITMQ_HOST,
37
userid=settings.RABBITMQ_USERID,
38
password=settings.RABBITMQ_PASSWORD,
39
virtual_host=settings.RABBITMQ_VIRTUAL_HOST,
41
except socket.error as e:
42
if e.errno == ECONNREFUSED:
43
raise NoRabbit(e.message)
48
class RabbitSession(threading.local):
51
self._connection = None
55
if self._connection is None or self._connection.transport is None:
56
self._connection = connect()
57
return self._connection
60
if self._connection is not None:
62
self._connection.close()
64
self._connection = None
67
class RabbitMessaging:
69
def __init__(self, exchange_name):
70
self.exchange_name = exchange_name
71
self._session = RabbitSession()
73
def getExchange(self):
74
return RabbitExchange(self._session, self.exchange_name)
77
return RabbitQueue(self._session, self.exchange_name)
80
class RabbitBase(threading.local):
82
def __init__(self, session, exchange_name):
83
self.exchange_name = exchange_name
84
self._session = session
89
if self._channel is None or not self._channel.is_open:
90
self._channel = self._session.connection.channel()
91
self._channel.exchange_declare(
92
self.exchange_name, type='fanout')
96
class RabbitExchange(RabbitBase):
98
def publish(self, message):
99
msg = amqp.Message(message)
100
# Publish to a 'fanout' exchange: routing_key is ''.
101
self.channel.basic_publish(
102
exchange=self.exchange_name, routing_key='', msg=msg)
105
class RabbitQueue(RabbitBase):
107
def __init__(self, session, exchange_name):
108
super(RabbitQueue, self).__init__(session, exchange_name)
109
self.queue_name = self.channel.queue_declare(
110
nowait=False, auto_delete=False,
111
arguments={"x-expires": 300000})[0]
112
self.channel.queue_bind(
113
exchange=self.exchange_name, queue=self.queue_name)
117
return self.queue_name