1
# Copyright 2012 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Rabbit messaging tests."""
6
from __future__ import (
15
from amqplib import client_0_8 as amqp
16
from fixtures import MonkeyPatch
17
from maasserver.rabbit import (
24
from maasserver.testing.factory import factory
25
from maastesting.testcase import TestCase
26
from rabbitfixture.server import RabbitServer
29
class RabbitTestCase(TestCase):
32
super(RabbitTestCase, self).setUp()
33
self.rabbit_server = self.useFixture(RabbitServer())
34
self.rabbit_env = self.rabbit_server.runner.environment
36
"maasserver.rabbit.connect", self.rabbit_env.get_connection)
37
self.useFixture(patch)
39
def get_command_output(self, command):
40
# Returns the output of the given rabbit command.
41
return self.rabbit_env.rabbitctl(str(command))[0]
44
class TestRabbitSession(RabbitTestCase):
46
def test_session_connection(self):
47
session = RabbitSession()
48
# Referencing the connection property causes a connection to be
50
connection = session.connection
51
self.assertIsNotNone(session._connection)
52
# The same connection is returned every time.
53
self.assertIs(connection, session.connection)
55
def test_session_disconnect(self):
56
session = RabbitSession()
58
self.assertIsNone(session._connection)
61
class TestRabbitMessaging(RabbitTestCase):
63
def test_messaging_getExchange(self):
64
exchange_name = factory.getRandomString()
65
messaging = RabbitMessaging(exchange_name)
66
exchange = messaging.getExchange()
67
self.assertIsInstance(exchange, RabbitExchange)
68
self.assertEqual(messaging._session, exchange._session)
69
self.assertEqual(exchange_name, exchange.exchange_name)
71
def test_messaging_getQueue(self):
72
exchange_name = factory.getRandomString()
73
messaging = RabbitMessaging(exchange_name)
74
queue = messaging.getQueue()
75
self.assertIsInstance(queue, RabbitQueue)
76
self.assertEqual(messaging._session, queue._session)
77
self.assertEqual(exchange_name, queue.exchange_name)
80
class TestRabbitBase(RabbitTestCase):
82
def test_rabbitbase_contains_session(self):
83
exchange_name = factory.getRandomString()
84
rabbitbase = RabbitBase(RabbitSession(), exchange_name)
85
self.assertIsInstance(rabbitbase._session, RabbitSession)
87
def test_base_has_exchange_name(self):
88
exchange_name = factory.getRandomString()
89
rabbitbase = RabbitBase(RabbitSession(), exchange_name)
90
self.assertEqual(exchange_name, rabbitbase.exchange_name)
92
def test_base_channel(self):
93
rabbitbase = RabbitBase(RabbitSession(), factory.getRandomString())
94
# Referencing the channel property causes an open channel to be
96
channel = rabbitbase.channel
97
self.assertTrue(channel.is_open)
98
self.assertIsNotNone(rabbitbase._session._connection)
99
# The same channel is returned every time.
100
self.assertIs(channel, rabbitbase.channel)
102
def test_base_channel_creates_exchange(self):
103
exchange_name = factory.getRandomString()
104
rabbitbase = RabbitBase(RabbitSession(), exchange_name)
108
self.get_command_output('list_exchanges'))
111
class TestRabbitExchange(RabbitTestCase):
113
def test_exchange_publish(self):
114
exchange_name = factory.getRandomString()
115
message_content = factory.getRandomString()
116
exchange = RabbitExchange(RabbitSession(), exchange_name)
118
channel = RabbitBase(RabbitSession(), exchange_name).channel
119
queue_name = channel.queue_declare(auto_delete=True)[0]
120
channel.queue_bind(exchange=exchange_name, queue=queue_name)
121
exchange.publish(message_content)
122
message = channel.basic_get(queue_name)
123
self.assertEqual(message_content, message.body)
126
class TestRabbitQueue(RabbitTestCase):
128
def test_rabbit_queue_binds_queue(self):
129
exchange_name = factory.getRandomString()
130
message_content = factory.getRandomString()
131
queue = RabbitQueue(RabbitSession(), exchange_name)
133
# Publish to queue.name.
134
base = RabbitBase(RabbitSession(), exchange_name)
135
channel = base.channel
136
msg = amqp.Message(message_content)
137
channel.basic_publish(
138
exchange=exchange_name, routing_key='', msg=msg)
139
message = channel.basic_get(queue.name)
140
self.assertEqual(message_content, message.body)