~ubuntu-branches/ubuntu/utopic/oslo.messaging/utopic

« back to all changes in this revision

Viewing changes to tests/test_notify_listener.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2014-03-27 13:01:34 UTC
  • mfrom: (1.1.3)
  • Revision ID: package-import@ubuntu.com-20140327130134-va1pxzs253r43n15
Tags: 1.3.0~a9-0ubuntu1
* New upstream release (LP: #1298970)
* debian/control:
  - Add python-oslotest as a build dependency.
  - Use python-oslosphinx instead of python-oslo.sphinx

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
 
 
2
# Copyright 2013 eNovance
 
3
#
 
4
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
5
#    not use this file except in compliance with the License. You may obtain
 
6
#    a copy of the License at
 
7
#
 
8
#         http://www.apache.org/licenses/LICENSE-2.0
 
9
#
 
10
#    Unless required by applicable law or agreed to in writing, software
 
11
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
12
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
13
#    License for the specific language governing permissions and limitations
 
14
#    under the License.
 
15
 
 
16
import threading
 
17
 
 
18
import mock
 
19
from oslo.config import cfg
 
20
import testscenarios
 
21
 
 
22
from oslo import messaging
 
23
from oslo.messaging.notify import dispatcher
 
24
from tests import utils as test_utils
 
25
 
 
26
load_tests = testscenarios.load_tests_apply_scenarios
 
27
 
 
28
 
 
29
class ListenerSetupMixin(object):
 
30
 
 
31
    class Listener(object):
 
32
        def __init__(self, transport, targets, endpoints, expect_messages):
 
33
            self._expect_messages = expect_messages
 
34
            self._received_msgs = 0
 
35
            self._listener = messaging.get_notification_listener(
 
36
                transport, targets, endpoints + [self], allow_requeue=True)
 
37
 
 
38
        def info(self, ctxt, publisher_id, event_type, payload):
 
39
            self._received_msgs += 1
 
40
            if self._expect_messages == self._received_msgs:
 
41
                # Check start() does nothing with a running listener
 
42
                self._listener.start()
 
43
                self._listener.stop()
 
44
                self._listener.wait()
 
45
 
 
46
        def start(self):
 
47
            self._listener.start()
 
48
 
 
49
    def _setup_listener(self, transport, endpoints, expect_messages,
 
50
                        targets=None):
 
51
        listener = self.Listener(transport,
 
52
                                 targets=targets or [
 
53
                                     messaging.Target(topic='testtopic')],
 
54
                                 expect_messages=expect_messages,
 
55
                                 endpoints=endpoints)
 
56
 
 
57
        thread = threading.Thread(target=listener.start)
 
58
        thread.daemon = True
 
59
        thread.start()
 
60
        return thread
 
61
 
 
62
    def _stop_listener(self, thread):
 
63
        thread.join(timeout=5)
 
64
 
 
65
    def _setup_notifier(self, transport, topic='testtopic',
 
66
                        publisher_id='testpublisher'):
 
67
        return messaging.Notifier(transport, topic=topic,
 
68
                                  driver='messaging',
 
69
                                  publisher_id=publisher_id)
 
70
 
 
71
 
 
72
class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
 
73
 
 
74
    def __init__(self, *args):
 
75
        super(TestNotifyListener, self).__init__(*args)
 
76
        ListenerSetupMixin.__init__(self)
 
77
 
 
78
    def setUp(self):
 
79
        super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
 
80
 
 
81
    def test_constructor(self):
 
82
        transport = messaging.get_transport(self.conf, url='fake:')
 
83
        target = messaging.Target(topic='foo')
 
84
        endpoints = [object()]
 
85
 
 
86
        listener = messaging.get_notification_listener(transport, [target],
 
87
                                                       endpoints)
 
88
 
 
89
        self.assertIs(listener.conf, self.conf)
 
90
        self.assertIs(listener.transport, transport)
 
91
        self.assertIsInstance(listener.dispatcher,
 
92
                              dispatcher.NotificationDispatcher)
 
93
        self.assertIs(listener.dispatcher.endpoints, endpoints)
 
94
        self.assertIs(listener.executor, 'blocking')
 
95
 
 
96
    def test_no_target_topic(self):
 
97
        transport = messaging.get_transport(self.conf, url='fake:')
 
98
 
 
99
        listener = messaging.get_notification_listener(transport,
 
100
                                                       [messaging.Target()],
 
101
                                                       [mock.Mock()])
 
102
        try:
 
103
            listener.start()
 
104
        except Exception as ex:
 
105
            self.assertIsInstance(ex, messaging.InvalidTarget, ex)
 
106
        else:
 
107
            self.assertTrue(False)
 
108
 
 
109
    def test_unknown_executor(self):
 
110
        transport = messaging.get_transport(self.conf, url='fake:')
 
111
 
 
112
        try:
 
113
            messaging.get_notification_listener(transport, [], [],
 
114
                                                executor='foo')
 
115
        except Exception as ex:
 
116
            self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
 
117
            self.assertEqual(ex.executor, 'foo')
 
118
        else:
 
119
            self.assertTrue(False)
 
120
 
 
121
    def test_one_topic(self):
 
122
        transport = messaging.get_transport(self.conf, url='fake:')
 
123
 
 
124
        endpoint = mock.Mock()
 
125
        endpoint.info.return_value = None
 
126
        listener_thread = self._setup_listener(transport, [endpoint], 1)
 
127
 
 
128
        notifier = self._setup_notifier(transport)
 
129
        notifier.info({}, 'an_event.start', 'test message')
 
130
 
 
131
        self._stop_listener(listener_thread)
 
132
 
 
133
        endpoint.info.assert_called_once_with(
 
134
            {}, 'testpublisher', 'an_event.start', 'test message',
 
135
            {'message_id': mock.ANY, 'timestamp': mock.ANY})
 
136
 
 
137
    def test_two_topics(self):
 
138
        transport = messaging.get_transport(self.conf, url='fake:')
 
139
 
 
140
        endpoint = mock.Mock()
 
141
        endpoint.info.return_value = None
 
142
        targets = [messaging.Target(topic="topic1"),
 
143
                   messaging.Target(topic="topic2")]
 
144
        listener_thread = self._setup_listener(transport, [endpoint], 2,
 
145
                                               targets=targets)
 
146
        notifier = self._setup_notifier(transport, topic='topic1')
 
147
        notifier.info({'ctxt': '1'}, 'an_event.start1', 'test')
 
148
        notifier = self._setup_notifier(transport, topic='topic2')
 
149
        notifier.info({'ctxt': '2'}, 'an_event.start2', 'test')
 
150
 
 
151
        self._stop_listener(listener_thread)
 
152
 
 
153
        expected = [mock.call({'ctxt': '1'}, 'testpublisher',
 
154
                              'an_event.start1', 'test',
 
155
                              {'timestamp': mock.ANY, 'message_id': mock.ANY}),
 
156
                    mock.call({'ctxt': '2'}, 'testpublisher',
 
157
                              'an_event.start2', 'test',
 
158
                              {'timestamp': mock.ANY, 'message_id': mock.ANY})]
 
159
 
 
160
        self.assertEqual(sorted(endpoint.info.call_args_list), expected)
 
161
 
 
162
    def test_two_exchanges(self):
 
163
        transport = messaging.get_transport(self.conf, url='fake:')
 
164
 
 
165
        endpoint = mock.Mock()
 
166
        endpoint.info.return_value = None
 
167
        targets = [messaging.Target(topic="topic",
 
168
                                    exchange="exchange1"),
 
169
                   messaging.Target(topic="topic",
 
170
                                    exchange="exchange2")]
 
171
        listener_thread = self._setup_listener(transport, [endpoint], 3,
 
172
                                               targets=targets)
 
173
 
 
174
        notifier = self._setup_notifier(transport, topic="topic")
 
175
 
 
176
        def mock_notifier_exchange(name):
 
177
            def side_effect(target, ctxt, message, version):
 
178
                target.exchange = name
 
179
                return transport._driver.send_notification(target, ctxt,
 
180
                                                           message, version)
 
181
            transport._send_notification = mock.MagicMock(
 
182
                side_effect=side_effect)
 
183
 
 
184
        notifier.info({'ctxt': '0'},
 
185
                      'an_event.start', 'test message default exchange')
 
186
        mock_notifier_exchange('exchange1')
 
187
        notifier.info({'ctxt': '1'},
 
188
                      'an_event.start', 'test message exchange1')
 
189
        mock_notifier_exchange('exchange2')
 
190
        notifier.info({'ctxt': '2'},
 
191
                      'an_event.start', 'test message exchange2')
 
192
 
 
193
        self._stop_listener(listener_thread)
 
194
 
 
195
        expected = [mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start',
 
196
                              'test message exchange1',
 
197
                              {'timestamp': mock.ANY, 'message_id': mock.ANY}),
 
198
                    mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start',
 
199
                              'test message exchange2',
 
200
                              {'timestamp': mock.ANY, 'message_id': mock.ANY})]
 
201
        self.assertEqual(sorted(endpoint.info.call_args_list), expected)
 
202
 
 
203
    def test_two_endpoints(self):
 
204
        transport = messaging.get_transport(self.conf, url='fake:')
 
205
 
 
206
        endpoint1 = mock.Mock()
 
207
        endpoint1.info.return_value = None
 
208
        endpoint2 = mock.Mock()
 
209
        endpoint2.info.return_value = messaging.NotificationResult.HANDLED
 
210
        listener_thread = self._setup_listener(transport,
 
211
                                               [endpoint1, endpoint2], 1)
 
212
        notifier = self._setup_notifier(transport)
 
213
        notifier.info({}, 'an_event.start', 'test')
 
214
 
 
215
        self._stop_listener(listener_thread)
 
216
 
 
217
        endpoint1.info.assert_called_once_with(
 
218
            {}, 'testpublisher', 'an_event.start', 'test', {
 
219
                'timestamp': mock.ANY,
 
220
                'message_id': mock.ANY})
 
221
 
 
222
        endpoint2.info.assert_called_once_with(
 
223
            {}, 'testpublisher', 'an_event.start', 'test', {
 
224
                'timestamp': mock.ANY,
 
225
                'message_id': mock.ANY})
 
226
 
 
227
    def test_requeue(self):
 
228
        transport = messaging.get_transport(self.conf, url='fake:')
 
229
        endpoint = mock.Mock()
 
230
        endpoint.info = mock.Mock()
 
231
 
 
232
        def side_effect_requeue(*args, **kwargs):
 
233
            if endpoint.info.call_count == 1:
 
234
                return messaging.NotificationResult.REQUEUE
 
235
            return messaging.NotificationResult.HANDLED
 
236
 
 
237
        endpoint.info.side_effect = side_effect_requeue
 
238
        listener_thread = self._setup_listener(transport,
 
239
                                               [endpoint], 2)
 
240
        notifier = self._setup_notifier(transport)
 
241
        notifier.info({}, 'an_event.start', 'test')
 
242
 
 
243
        self._stop_listener(listener_thread)
 
244
 
 
245
        expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test',
 
246
                              {'timestamp': mock.ANY, 'message_id': mock.ANY}),
 
247
                    mock.call({}, 'testpublisher', 'an_event.start', 'test',
 
248
                              {'timestamp': mock.ANY, 'message_id': mock.ANY})]
 
249
        self.assertEqual(endpoint.info.call_args_list, expected)