2
# Copyright 2013 eNovance
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
19
from oslo.config import cfg
22
from oslo import messaging
23
from oslo.messaging.notify import dispatcher
24
from tests import utils as test_utils
26
load_tests = testscenarios.load_tests_apply_scenarios
29
class ListenerSetupMixin(object):
31
class Listener(object):
32
def __init__(self, transport, targets, endpoints, expect_messages):
33
self._expect_messages = expect_messages
34
self._received_msgs = 0
35
self._listener = messaging.get_notification_listener(
36
transport, targets, endpoints + [self], allow_requeue=True)
38
def info(self, ctxt, publisher_id, event_type, payload):
39
self._received_msgs += 1
40
if self._expect_messages == self._received_msgs:
41
# Check start() does nothing with a running listener
42
self._listener.start()
47
self._listener.start()
49
def _setup_listener(self, transport, endpoints, expect_messages,
51
listener = self.Listener(transport,
53
messaging.Target(topic='testtopic')],
54
expect_messages=expect_messages,
57
thread = threading.Thread(target=listener.start)
62
def _stop_listener(self, thread):
63
thread.join(timeout=5)
65
def _setup_notifier(self, transport, topic='testtopic',
66
publisher_id='testpublisher'):
67
return messaging.Notifier(transport, topic=topic,
69
publisher_id=publisher_id)
72
class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
74
def __init__(self, *args):
75
super(TestNotifyListener, self).__init__(*args)
76
ListenerSetupMixin.__init__(self)
79
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
81
def test_constructor(self):
82
transport = messaging.get_transport(self.conf, url='fake:')
83
target = messaging.Target(topic='foo')
84
endpoints = [object()]
86
listener = messaging.get_notification_listener(transport, [target],
89
self.assertIs(listener.conf, self.conf)
90
self.assertIs(listener.transport, transport)
91
self.assertIsInstance(listener.dispatcher,
92
dispatcher.NotificationDispatcher)
93
self.assertIs(listener.dispatcher.endpoints, endpoints)
94
self.assertIs(listener.executor, 'blocking')
96
def test_no_target_topic(self):
97
transport = messaging.get_transport(self.conf, url='fake:')
99
listener = messaging.get_notification_listener(transport,
100
[messaging.Target()],
104
except Exception as ex:
105
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
107
self.assertTrue(False)
109
def test_unknown_executor(self):
110
transport = messaging.get_transport(self.conf, url='fake:')
113
messaging.get_notification_listener(transport, [], [],
115
except Exception as ex:
116
self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
117
self.assertEqual(ex.executor, 'foo')
119
self.assertTrue(False)
121
def test_one_topic(self):
122
transport = messaging.get_transport(self.conf, url='fake:')
124
endpoint = mock.Mock()
125
endpoint.info.return_value = None
126
listener_thread = self._setup_listener(transport, [endpoint], 1)
128
notifier = self._setup_notifier(transport)
129
notifier.info({}, 'an_event.start', 'test message')
131
self._stop_listener(listener_thread)
133
endpoint.info.assert_called_once_with(
134
{}, 'testpublisher', 'an_event.start', 'test message',
135
{'message_id': mock.ANY, 'timestamp': mock.ANY})
137
def test_two_topics(self):
138
transport = messaging.get_transport(self.conf, url='fake:')
140
endpoint = mock.Mock()
141
endpoint.info.return_value = None
142
targets = [messaging.Target(topic="topic1"),
143
messaging.Target(topic="topic2")]
144
listener_thread = self._setup_listener(transport, [endpoint], 2,
146
notifier = self._setup_notifier(transport, topic='topic1')
147
notifier.info({'ctxt': '1'}, 'an_event.start1', 'test')
148
notifier = self._setup_notifier(transport, topic='topic2')
149
notifier.info({'ctxt': '2'}, 'an_event.start2', 'test')
151
self._stop_listener(listener_thread)
153
expected = [mock.call({'ctxt': '1'}, 'testpublisher',
154
'an_event.start1', 'test',
155
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
156
mock.call({'ctxt': '2'}, 'testpublisher',
157
'an_event.start2', 'test',
158
{'timestamp': mock.ANY, 'message_id': mock.ANY})]
160
self.assertEqual(sorted(endpoint.info.call_args_list), expected)
162
def test_two_exchanges(self):
163
transport = messaging.get_transport(self.conf, url='fake:')
165
endpoint = mock.Mock()
166
endpoint.info.return_value = None
167
targets = [messaging.Target(topic="topic",
168
exchange="exchange1"),
169
messaging.Target(topic="topic",
170
exchange="exchange2")]
171
listener_thread = self._setup_listener(transport, [endpoint], 3,
174
notifier = self._setup_notifier(transport, topic="topic")
176
def mock_notifier_exchange(name):
177
def side_effect(target, ctxt, message, version):
178
target.exchange = name
179
return transport._driver.send_notification(target, ctxt,
181
transport._send_notification = mock.MagicMock(
182
side_effect=side_effect)
184
notifier.info({'ctxt': '0'},
185
'an_event.start', 'test message default exchange')
186
mock_notifier_exchange('exchange1')
187
notifier.info({'ctxt': '1'},
188
'an_event.start', 'test message exchange1')
189
mock_notifier_exchange('exchange2')
190
notifier.info({'ctxt': '2'},
191
'an_event.start', 'test message exchange2')
193
self._stop_listener(listener_thread)
195
expected = [mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start',
196
'test message exchange1',
197
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
198
mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start',
199
'test message exchange2',
200
{'timestamp': mock.ANY, 'message_id': mock.ANY})]
201
self.assertEqual(sorted(endpoint.info.call_args_list), expected)
203
def test_two_endpoints(self):
204
transport = messaging.get_transport(self.conf, url='fake:')
206
endpoint1 = mock.Mock()
207
endpoint1.info.return_value = None
208
endpoint2 = mock.Mock()
209
endpoint2.info.return_value = messaging.NotificationResult.HANDLED
210
listener_thread = self._setup_listener(transport,
211
[endpoint1, endpoint2], 1)
212
notifier = self._setup_notifier(transport)
213
notifier.info({}, 'an_event.start', 'test')
215
self._stop_listener(listener_thread)
217
endpoint1.info.assert_called_once_with(
218
{}, 'testpublisher', 'an_event.start', 'test', {
219
'timestamp': mock.ANY,
220
'message_id': mock.ANY})
222
endpoint2.info.assert_called_once_with(
223
{}, 'testpublisher', 'an_event.start', 'test', {
224
'timestamp': mock.ANY,
225
'message_id': mock.ANY})
227
def test_requeue(self):
228
transport = messaging.get_transport(self.conf, url='fake:')
229
endpoint = mock.Mock()
230
endpoint.info = mock.Mock()
232
def side_effect_requeue(*args, **kwargs):
233
if endpoint.info.call_count == 1:
234
return messaging.NotificationResult.REQUEUE
235
return messaging.NotificationResult.HANDLED
237
endpoint.info.side_effect = side_effect_requeue
238
listener_thread = self._setup_listener(transport,
240
notifier = self._setup_notifier(transport)
241
notifier.info({}, 'an_event.start', 'test')
243
self._stop_listener(listener_thread)
245
expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test',
246
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
247
mock.call({}, 'testpublisher', 'an_event.start', 'test',
248
{'timestamp': mock.ANY, 'message_id': mock.ANY})]
249
self.assertEqual(endpoint.info.call_args_list, expected)