~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/tests/rpc/test_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
# Copyright 2012, Red Hat, Inc.
7
 
#
8
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
9
 
#    not use this file except in compliance with the License. You may obtain
10
 
#    a copy of the License at
11
 
#
12
 
#         http://www.apache.org/licenses/LICENSE-2.0
13
 
#
14
 
#    Unless required by applicable law or agreed to in writing, software
15
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
 
#    License for the specific language governing permissions and limitations
18
 
#    under the License.
19
 
"""
20
 
Unit Tests for remote procedure calls using qpid
21
 
"""
22
 
 
23
 
import mox
24
 
 
25
 
from nova import context
26
 
from nova import flags
27
 
from nova import log as logging
28
 
from nova.rpc import amqp as rpc_amqp
29
 
from nova import test
30
 
 
31
 
try:
32
 
    from nova.rpc import impl_qpid
33
 
    import qpid
34
 
except ImportError:
35
 
    qpid = None
36
 
    impl_qpid = None
37
 
 
38
 
 
39
 
FLAGS = flags.FLAGS
40
 
LOG = logging.getLogger(__name__)
41
 
 
42
 
 
43
 
class RpcQpidTestCase(test.TestCase):
44
 
    """
45
 
    Exercise the public API of impl_qpid utilizing mox.
46
 
 
47
 
    This set of tests utilizes mox to replace the Qpid objects and ensures
48
 
    that the right operations happen on them when the various public rpc API
49
 
    calls are exercised.  The API calls tested here include:
50
 
 
51
 
        nova.rpc.create_connection()
52
 
        nova.rpc.common.Connection.create_consumer()
53
 
        nova.rpc.common.Connection.close()
54
 
        nova.rpc.cast()
55
 
        nova.rpc.fanout_cast()
56
 
        nova.rpc.call()
57
 
        nova.rpc.multicall()
58
 
    """
59
 
 
60
 
    def setUp(self):
61
 
        super(RpcQpidTestCase, self).setUp()
62
 
 
63
 
        self.mock_connection = None
64
 
        self.mock_session = None
65
 
        self.mock_sender = None
66
 
        self.mock_receiver = None
67
 
 
68
 
        if qpid:
69
 
            impl_qpid.register_opts(FLAGS)
70
 
            self.orig_connection = qpid.messaging.Connection
71
 
            self.orig_session = qpid.messaging.Session
72
 
            self.orig_sender = qpid.messaging.Sender
73
 
            self.orig_receiver = qpid.messaging.Receiver
74
 
            qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
75
 
            qpid.messaging.Session = lambda *_x, **_y: self.mock_session
76
 
            qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
77
 
            qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
78
 
 
79
 
    def tearDown(self):
80
 
        if qpid:
81
 
            qpid.messaging.Connection = self.orig_connection
82
 
            qpid.messaging.Session = self.orig_session
83
 
            qpid.messaging.Sender = self.orig_sender
84
 
            qpid.messaging.Receiver = self.orig_receiver
85
 
        if impl_qpid:
86
 
            # Need to reset this in case we changed the connection_cls
87
 
            # in self._setup_to_server_tests()
88
 
            impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
89
 
 
90
 
        super(RpcQpidTestCase, self).tearDown()
91
 
 
92
 
    @test.skip_if(qpid is None, "Test requires qpid")
93
 
    def test_create_connection(self):
94
 
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
95
 
        self.mock_session = self.mox.CreateMock(self.orig_session)
96
 
 
97
 
        self.mock_connection.opened().AndReturn(False)
98
 
        self.mock_connection.open()
99
 
        self.mock_connection.session().AndReturn(self.mock_session)
100
 
        self.mock_connection.close()
101
 
 
102
 
        self.mox.ReplayAll()
103
 
 
104
 
        connection = impl_qpid.create_connection(FLAGS)
105
 
        connection.close()
106
 
 
107
 
    def _test_create_consumer(self, fanout):
108
 
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
109
 
        self.mock_session = self.mox.CreateMock(self.orig_session)
110
 
        self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
111
 
 
112
 
        self.mock_connection.opened().AndReturn(False)
113
 
        self.mock_connection.open()
114
 
        self.mock_connection.session().AndReturn(self.mock_session)
115
 
        if fanout:
116
 
            # The link name includes a UUID, so match it with a regex.
117
 
            expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
118
 
                '{"node": {"x-declare": {"auto-delete": true, "durable": '
119
 
                'false, "type": "fanout"}, "type": "topic"}, "create": '
120
 
                '"always", "link": {"x-declare": {"auto-delete": true, '
121
 
                '"exclusive": true, "durable": false}, "durable": true, '
122
 
                '"name": "impl_qpid_test_fanout_.*"}}$')
123
 
        else:
124
 
            expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
125
 
                '{"auto-delete": true, "durable": true}, "type": "topic"}, '
126
 
                '"create": "always", "link": {"x-declare": {"auto-delete": '
127
 
                'true, "exclusive": false, "durable": false}, "durable": '
128
 
                'true, "name": "impl_qpid_test"}}')
129
 
        self.mock_session.receiver(expected_address).AndReturn(
130
 
                                                        self.mock_receiver)
131
 
        self.mock_receiver.capacity = 1
132
 
        self.mock_connection.close()
133
 
 
134
 
        self.mox.ReplayAll()
135
 
 
136
 
        connection = impl_qpid.create_connection(FLAGS)
137
 
        connection.create_consumer("impl_qpid_test",
138
 
                                   lambda *_x, **_y: None,
139
 
                                   fanout)
140
 
        connection.close()
141
 
 
142
 
    @test.skip_if(qpid is None, "Test requires qpid")
143
 
    def test_create_consumer(self):
144
 
        self._test_create_consumer(fanout=False)
145
 
 
146
 
    @test.skip_if(qpid is None, "Test requires qpid")
147
 
    def test_create_consumer_fanout(self):
148
 
        self._test_create_consumer(fanout=True)
149
 
 
150
 
    @test.skip_if(qpid is None, "Test requires qpid")
151
 
    def test_create_worker(self):
152
 
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
153
 
        self.mock_session = self.mox.CreateMock(self.orig_session)
154
 
        self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
155
 
 
156
 
        self.mock_connection.opened().AndReturn(False)
157
 
        self.mock_connection.open()
158
 
        self.mock_connection.session().AndReturn(self.mock_session)
159
 
        expected_address = (
160
 
            'nova/impl_qpid_test ; {"node": {"x-declare": '
161
 
            '{"auto-delete": true, "durable": true}, "type": "topic"}, '
162
 
            '"create": "always", "link": {"x-declare": {"auto-delete": '
163
 
            'true, "exclusive": false, "durable": false}, "durable": '
164
 
            'true, "name": "impl.qpid.test.workers"}}')
165
 
        self.mock_session.receiver(expected_address).AndReturn(
166
 
                                                        self.mock_receiver)
167
 
        self.mock_receiver.capacity = 1
168
 
        self.mock_connection.close()
169
 
 
170
 
        self.mox.ReplayAll()
171
 
 
172
 
        connection = impl_qpid.create_connection(FLAGS)
173
 
        connection.create_worker("impl_qpid_test",
174
 
                                 lambda *_x, **_y: None,
175
 
                                 'impl.qpid.test.workers',
176
 
                                 )
177
 
        connection.close()
178
 
 
179
 
    def _test_cast(self, fanout, server_params=None):
180
 
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
181
 
        self.mock_session = self.mox.CreateMock(self.orig_session)
182
 
        self.mock_sender = self.mox.CreateMock(self.orig_sender)
183
 
 
184
 
        self.mock_connection.opened().AndReturn(False)
185
 
        self.mock_connection.open()
186
 
 
187
 
        self.mock_connection.session().AndReturn(self.mock_session)
188
 
        if fanout:
189
 
            expected_address = ('impl_qpid_test_fanout ; '
190
 
                '{"node": {"x-declare": {"auto-delete": true, '
191
 
                '"durable": false, "type": "fanout"}, '
192
 
                '"type": "topic"}, "create": "always"}')
193
 
        else:
194
 
            expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
195
 
                '{"auto-delete": true, "durable": false}, "type": "topic"}, '
196
 
                '"create": "always"}')
197
 
        self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
198
 
        self.mock_sender.send(mox.IgnoreArg())
199
 
        if not server_params:
200
 
            # This is a pooled connection, so instead of closing it, it
201
 
            # gets reset, which is just creating a new session on the
202
 
            # connection.
203
 
            self.mock_session.close()
204
 
            self.mock_connection.session().AndReturn(self.mock_session)
205
 
 
206
 
        self.mox.ReplayAll()
207
 
 
208
 
        try:
209
 
            ctx = context.RequestContext("user", "project")
210
 
 
211
 
            args = [FLAGS, ctx, "impl_qpid_test",
212
 
                    {"method": "test_method", "args": {}}]
213
 
 
214
 
            if server_params:
215
 
                args.insert(2, server_params)
216
 
                if fanout:
217
 
                    method = impl_qpid.fanout_cast_to_server
218
 
                else:
219
 
                    method = impl_qpid.cast_to_server
220
 
            else:
221
 
                if fanout:
222
 
                    method = impl_qpid.fanout_cast
223
 
                else:
224
 
                    method = impl_qpid.cast
225
 
 
226
 
            method(*args)
227
 
        finally:
228
 
            while impl_qpid.Connection.pool.free_items:
229
 
                # Pull the mock connection object out of the connection pool so
230
 
                # that it doesn't mess up other test cases.
231
 
                impl_qpid.Connection.pool.get()
232
 
 
233
 
    @test.skip_if(qpid is None, "Test requires qpid")
234
 
    def test_cast(self):
235
 
        self._test_cast(fanout=False)
236
 
 
237
 
    @test.skip_if(qpid is None, "Test requires qpid")
238
 
    def test_fanout_cast(self):
239
 
        self._test_cast(fanout=True)
240
 
 
241
 
    def _setup_to_server_tests(self, server_params):
242
 
        class MyConnection(impl_qpid.Connection):
243
 
            def __init__(myself, *args, **kwargs):
244
 
                super(MyConnection, myself).__init__(*args, **kwargs)
245
 
                self.assertEqual(myself.connection.username,
246
 
                        server_params['username'])
247
 
                self.assertEqual(myself.connection.password,
248
 
                        server_params['password'])
249
 
                self.assertEqual(myself.broker,
250
 
                        server_params['hostname'] + ':' +
251
 
                                str(server_params['port']))
252
 
 
253
 
        MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
254
 
        self.stubs.Set(impl_qpid, 'Connection', MyConnection)
255
 
 
256
 
    @test.skip_if(qpid is None, "Test requires qpid")
257
 
    def test_cast_to_server(self):
258
 
        server_params = {'username': 'fake_username',
259
 
                         'password': 'fake_password',
260
 
                         'hostname': 'fake_hostname',
261
 
                         'port': 31337}
262
 
        self._setup_to_server_tests(server_params)
263
 
        self._test_cast(fanout=False, server_params=server_params)
264
 
 
265
 
    @test.skip_if(qpid is None, "Test requires qpid")
266
 
    def test_fanout_cast_to_server(self):
267
 
        server_params = {'username': 'fake_username',
268
 
                         'password': 'fake_password',
269
 
                         'hostname': 'fake_hostname',
270
 
                         'port': 31337}
271
 
        self._setup_to_server_tests(server_params)
272
 
        self._test_cast(fanout=True, server_params=server_params)
273
 
 
274
 
    def _test_call(self, multi):
275
 
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
276
 
        self.mock_session = self.mox.CreateMock(self.orig_session)
277
 
        self.mock_sender = self.mox.CreateMock(self.orig_sender)
278
 
        self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
279
 
 
280
 
        self.mock_connection.opened().AndReturn(False)
281
 
        self.mock_connection.open()
282
 
        self.mock_connection.session().AndReturn(self.mock_session)
283
 
        rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
284
 
                   ' true, "durable": true, "type": "direct"}, "type": '
285
 
                   '"topic"}, "create": "always", "link": {"x-declare": '
286
 
                   '{"auto-delete": true, "exclusive": true, "durable": '
287
 
                   'false}, "durable": true, "name": ".*"}}')
288
 
        self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
289
 
        self.mock_receiver.capacity = 1
290
 
        send_addr = ('nova/impl_qpid_test ; {"node": {"x-declare": '
291
 
            '{"auto-delete": true, "durable": false}, "type": "topic"}, '
292
 
            '"create": "always"}')
293
 
        self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
294
 
        self.mock_sender.send(mox.IgnoreArg())
295
 
 
296
 
        self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
297
 
                                                        self.mock_receiver)
298
 
        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
299
 
                        {"result": "foo", "failure": False, "ending": False}))
300
 
        if multi:
301
 
            self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
302
 
                                                        self.mock_receiver)
303
 
            self.mock_receiver.fetch().AndReturn(
304
 
                            qpid.messaging.Message(
305
 
                                {"result": "bar", "failure": False,
306
 
                                 "ending": False}))
307
 
            self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
308
 
                                                        self.mock_receiver)
309
 
            self.mock_receiver.fetch().AndReturn(
310
 
                            qpid.messaging.Message(
311
 
                                {"result": "baz", "failure": False,
312
 
                                 "ending": False}))
313
 
        self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
314
 
                                                        self.mock_receiver)
315
 
        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
316
 
                        {"failure": False, "ending": True}))
317
 
        self.mock_session.close()
318
 
        self.mock_connection.session().AndReturn(self.mock_session)
319
 
 
320
 
        self.mox.ReplayAll()
321
 
 
322
 
        try:
323
 
            ctx = context.RequestContext("user", "project")
324
 
 
325
 
            if multi:
326
 
                method = impl_qpid.multicall
327
 
            else:
328
 
                method = impl_qpid.call
329
 
 
330
 
            res = method(FLAGS, ctx, "impl_qpid_test",
331
 
                           {"method": "test_method", "args": {}})
332
 
 
333
 
            if multi:
334
 
                self.assertEquals(list(res), ["foo", "bar", "baz"])
335
 
            else:
336
 
                self.assertEquals(res, "foo")
337
 
        finally:
338
 
            while impl_qpid.Connection.pool.free_items:
339
 
                # Pull the mock connection object out of the connection pool so
340
 
                # that it doesn't mess up other test cases.
341
 
                impl_qpid.Connection.pool.get()
342
 
 
343
 
    @test.skip_if(qpid is None, "Test requires qpid")
344
 
    def test_call(self):
345
 
        self._test_call(multi=False)
346
 
 
347
 
    @test.skip_if(qpid is None, "Test requires qpid")
348
 
    def test_multicall(self):
349
 
        self._test_call(multi=True)
350
 
 
351
 
 
352
 
#
353
 
#from nova.tests.rpc import common
354
 
#
355
 
# Qpid does not have a handy in-memory transport like kombu, so it's not
356
 
# terribly straight forward to take advantage of the common unit tests.
357
 
# However, at least at the time of this writing, the common unit tests all pass
358
 
# with qpidd running.
359
 
#
360
 
# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
361
 
#     def setUp(self):
362
 
#         self.rpc = impl_qpid
363
 
#         super(RpcQpidCommonTestCase, self).setUp()
364
 
#
365
 
#     def tearDown(self):
366
 
#         super(RpcQpidCommonTestCase, self).tearDown()
367
 
#