1
# Copyright 2012-2014 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Rabbit messaging tests."""
6
from __future__ import (
21
from amqplib import client_0_8 as amqp
22
from django.conf import settings
23
from maasserver.exceptions import NoRabbit
24
from maasserver.rabbit import (
31
from maasserver.testing.factory import factory
32
from maasserver.testing.rabbit import uses_rabbit_fixture
33
from maastesting.testcase import MAASTestCase
34
from testtools.testcase import ExpectedException
37
def run_rabbit_command(rabbit, command):
38
"""Run a Rabbit command through rabbitctl, and return its output."""
39
if isinstance(command, unicode):
40
command = command.encode('ascii')
41
rabbit_env = rabbit.runner.environment
42
return rabbit_env.rabbitctl(command)[0]
45
class TestRabbitSession(MAASTestCase):
48
def test_connection_gets_connection(self):
49
session = RabbitSession()
50
self.addCleanup(session.disconnect)
51
# Referencing the connection property causes a connection to be
53
connection = session.connection
54
self.assertIsNotNone(session._connection)
55
# The same connection is returned every time.
56
self.assertIs(connection, session.connection)
58
def test_connection_raises_NoRabbit_if_cannot_connect(self):
59
# Attempt to connect to a RabbitMQ on the local "discard"
60
# service. The connection will be refused.
61
self.patch(settings, 'RABBITMQ_HOST', 'localhost:9')
62
session = RabbitSession()
63
with ExpectedException(NoRabbit):
66
def test_connection_propagates_exceptions(self):
68
def fail(*args, **kwargs):
69
raise socket.error("Connection not refused, but failed anyway.")
71
self.patch(amqp, 'Connection', fail)
72
session = RabbitSession()
73
with ExpectedException(socket.error):
76
def test_disconnect(self):
77
session = RabbitSession()
79
self.assertIsNone(session._connection)
82
class TestRabbitMessaging(MAASTestCase):
85
def test_messaging_getExchange(self):
86
exchange_name = factory.make_string()
87
messaging = RabbitMessaging(exchange_name)
88
self.addCleanup(messaging._session.disconnect)
89
exchange = messaging.getExchange()
90
self.assertIsInstance(exchange, RabbitExchange)
91
self.assertEqual(messaging._session, exchange._session)
92
self.assertEqual(exchange_name, exchange.exchange_name)
95
def test_messaging_getQueue(self):
96
exchange_name = factory.make_string()
97
messaging = RabbitMessaging(exchange_name)
98
self.addCleanup(messaging._session.disconnect)
99
queue = messaging.getQueue()
100
self.assertIsInstance(queue, RabbitQueue)
101
self.assertEqual(messaging._session, queue._session)
102
self.assertEqual(exchange_name, queue.exchange_name)
105
class TestRabbitBase(MAASTestCase):
107
def test_rabbitbase_contains_session(self):
108
exchange_name = factory.make_string()
109
rabbitbase = RabbitBase(RabbitSession(), exchange_name)
110
self.assertIsInstance(rabbitbase._session, RabbitSession)
112
def test_base_has_exchange_name(self):
113
exchange_name = factory.make_string()
114
rabbitbase = RabbitBase(RabbitSession(), exchange_name)
115
self.assertEqual(exchange_name, rabbitbase.exchange_name)
118
def test_base_channel(self):
119
rabbitbase = RabbitBase(RabbitSession(), factory.make_string())
120
self.addCleanup(rabbitbase._session.disconnect)
121
# Referencing the channel property causes an open channel to be
123
channel = rabbitbase.channel
124
self.assertTrue(channel.is_open)
125
self.assertIsNotNone(rabbitbase._session._connection)
126
# The same channel is returned every time.
127
self.assertIs(channel, rabbitbase.channel)
130
def test_base_channel_creates_exchange(self):
131
exchange_name = factory.make_string()
132
rabbitbase = RabbitBase(RabbitSession(), exchange_name)
133
self.addCleanup(rabbitbase._session.disconnect)
137
run_rabbit_command(self.rabbit, 'list_exchanges'))
140
class TestRabbitExchange(MAASTestCase):
142
def basic_get(self, channel, queue_name, timeout):
143
endtime = time.time() + timeout
145
message = channel.basic_get(queue_name)
147
if time.time() > endtime:
148
self.fail('Cannot get message.')
154
def test_exchange_publish(self):
155
exchange_name = factory.make_string()
156
message_content = factory.make_string()
157
exchange = RabbitExchange(RabbitSession(), exchange_name)
158
self.addCleanup(exchange._session.disconnect)
160
rabbitbase = RabbitBase(RabbitSession(), exchange_name)
161
self.addCleanup(rabbitbase._session.disconnect)
162
channel = rabbitbase.channel
163
queue_name = channel.queue_declare(auto_delete=True)[0]
164
channel.queue_bind(exchange=exchange_name, queue=queue_name)
165
exchange.publish(message_content)
166
message = self.basic_get(channel, queue_name, timeout=2)
167
self.assertEqual(message_content, message.body)
170
class TestRabbitQueue(MAASTestCase):
173
def test_rabbit_queue_binds_queue(self):
174
exchange_name = factory.make_string()
175
message_content = factory.make_string()
176
queue = RabbitQueue(RabbitSession(), exchange_name)
177
self.addCleanup(queue._session.disconnect)
179
# Publish to queue.name.
180
base = RabbitBase(RabbitSession(), exchange_name)
181
self.addCleanup(base._session.disconnect)
182
channel = base.channel
183
msg = amqp.Message(message_content)
184
channel.basic_publish(
185
exchange=exchange_name, routing_key='', msg=msg)
186
message = channel.basic_get(queue.name)
187
self.assertEqual(message_content, message.body)