~ubuntu-branches/ubuntu/trusty/cinder/trusty

« back to all changes in this revision

Viewing changes to cinder/tests/rpc/test_qpid.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-22 09:57:46 UTC
  • Revision ID: package-import@ubuntu.com-20120522095746-9lm71yvzltjybk4b
Tags: upstream-2012.2~f1~20120503.2
ImportĀ upstreamĀ versionĀ 2012.2~f1~20120503.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2010 United States Government as represented by the
 
4
# Administrator of the National Aeronautics and Space Administration.
 
5
# All Rights Reserved.
 
6
# Copyright 2012, Red Hat, Inc.
 
7
#
 
8
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
9
#    not use this file except in compliance with the License. You may obtain
 
10
#    a copy of the License at
 
11
#
 
12
#         http://www.apache.org/licenses/LICENSE-2.0
 
13
#
 
14
#    Unless required by applicable law or agreed to in writing, software
 
15
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
16
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
17
#    License for the specific language governing permissions and limitations
 
18
#    under the License.
 
19
"""
 
20
Unit Tests for remote procedure calls using qpid
 
21
"""
 
22
 
 
23
import mox
 
24
 
 
25
from cinder import context
 
26
from cinder import flags
 
27
from cinder import log as logging
 
28
from cinder.rpc import amqp as rpc_amqp
 
29
from cinder import test
 
30
 
 
31
try:
 
32
    import qpid
 
33
    from cinder.rpc import impl_qpid
 
34
except ImportError:
 
35
    qpid = None
 
36
    impl_qpid = None
 
37
 
 
38
 
 
39
FLAGS = flags.FLAGS
 
40
LOG = logging.getLogger(__name__)
 
41
 
 
42
 
 
43
class RpcQpidTestCase(test.TestCase):
 
44
    """
 
45
    Exercise the public API of impl_qpid utilizing mox.
 
46
 
 
47
    This set of tests utilizes mox to replace the Qpid objects and ensures
 
48
    that the right operations happen on them when the various public rpc API
 
49
    calls are exercised.  The API calls tested here include:
 
50
 
 
51
        cinder.rpc.create_connection()
 
52
        cinder.rpc.common.Connection.create_consumer()
 
53
        cinder.rpc.common.Connection.close()
 
54
        cinder.rpc.cast()
 
55
        cinder.rpc.fanout_cast()
 
56
        cinder.rpc.call()
 
57
        cinder.rpc.multicall()
 
58
    """
 
59
 
 
60
    def setUp(self):
 
61
        super(RpcQpidTestCase, self).setUp()
 
62
 
 
63
        self.mock_connection = None
 
64
        self.mock_session = None
 
65
        self.mock_sender = None
 
66
        self.mock_receiver = None
 
67
 
 
68
        if qpid:
 
69
            impl_qpid.register_opts(FLAGS)
 
70
            self.orig_connection = qpid.messaging.Connection
 
71
            self.orig_session = qpid.messaging.Session
 
72
            self.orig_sender = qpid.messaging.Sender
 
73
            self.orig_receiver = qpid.messaging.Receiver
 
74
            qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
 
75
            qpid.messaging.Session = lambda *_x, **_y: self.mock_session
 
76
            qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
 
77
            qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
 
78
 
 
79
    def tearDown(self):
 
80
        if qpid:
 
81
            qpid.messaging.Connection = self.orig_connection
 
82
            qpid.messaging.Session = self.orig_session
 
83
            qpid.messaging.Sender = self.orig_sender
 
84
            qpid.messaging.Receiver = self.orig_receiver
 
85
        if impl_qpid:
 
86
            # Need to reset this in case we changed the connection_cls
 
87
            # in self._setup_to_server_tests()
 
88
            impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
 
89
 
 
90
        super(RpcQpidTestCase, self).tearDown()
 
91
 
 
92
    @test.skip_if(qpid is None, "Test requires qpid")
 
93
    def test_create_connection(self):
 
94
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
 
95
        self.mock_session = self.mox.CreateMock(self.orig_session)
 
96
 
 
97
        self.mock_connection.opened().AndReturn(False)
 
98
        self.mock_connection.open()
 
99
        self.mock_connection.session().AndReturn(self.mock_session)
 
100
        self.mock_connection.close()
 
101
 
 
102
        self.mox.ReplayAll()
 
103
 
 
104
        connection = impl_qpid.create_connection(FLAGS)
 
105
        connection.close()
 
106
 
 
107
    def _test_create_consumer(self, fanout):
 
108
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
 
109
        self.mock_session = self.mox.CreateMock(self.orig_session)
 
110
        self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
 
111
 
 
112
        self.mock_connection.opened().AndReturn(False)
 
113
        self.mock_connection.open()
 
114
        self.mock_connection.session().AndReturn(self.mock_session)
 
115
        if fanout:
 
116
            # The link name includes a UUID, so match it with a regex.
 
117
            expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
 
118
                '{"node": {"x-declare": {"auto-delete": true, "durable": '
 
119
                'false, "type": "fanout"}, "type": "topic"}, "create": '
 
120
                '"always", "link": {"x-declare": {"auto-delete": true, '
 
121
                '"exclusive": true, "durable": false}, "durable": true, '
 
122
                '"name": "impl_qpid_test_fanout_.*"}}$')
 
123
        else:
 
124
            expected_address = (
 
125
                'cinder/impl_qpid_test ; {"node": {"x-declare": '
 
126
                '{"auto-delete": true, "durable": true}, "type": "topic"}, '
 
127
                '"create": "always", "link": {"x-declare": {"auto-delete": '
 
128
                'true, "exclusive": false, "durable": false}, "durable": '
 
129
                'true, "name": "impl_qpid_test"}}')
 
130
        self.mock_session.receiver(expected_address).AndReturn(
 
131
                                                        self.mock_receiver)
 
132
        self.mock_receiver.capacity = 1
 
133
        self.mock_connection.close()
 
134
 
 
135
        self.mox.ReplayAll()
 
136
 
 
137
        connection = impl_qpid.create_connection(FLAGS)
 
138
        connection.create_consumer("impl_qpid_test",
 
139
                                   lambda *_x, **_y: None,
 
140
                                   fanout)
 
141
        connection.close()
 
142
 
 
143
    @test.skip_if(qpid is None, "Test requires qpid")
 
144
    def test_create_consumer(self):
 
145
        self._test_create_consumer(fanout=False)
 
146
 
 
147
    @test.skip_if(qpid is None, "Test requires qpid")
 
148
    def test_create_consumer_fanout(self):
 
149
        self._test_create_consumer(fanout=True)
 
150
 
 
151
    def _test_cast(self, fanout, server_params=None):
 
152
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
 
153
        self.mock_session = self.mox.CreateMock(self.orig_session)
 
154
        self.mock_sender = self.mox.CreateMock(self.orig_sender)
 
155
 
 
156
        self.mock_connection.opened().AndReturn(False)
 
157
        self.mock_connection.open()
 
158
 
 
159
        self.mock_connection.session().AndReturn(self.mock_session)
 
160
        if fanout:
 
161
            expected_address = ('impl_qpid_test_fanout ; '
 
162
                '{"node": {"x-declare": {"auto-delete": true, '
 
163
                '"durable": false, "type": "fanout"}, '
 
164
                '"type": "topic"}, "create": "always"}')
 
165
        else:
 
166
            expected_address = (
 
167
                'cinder/impl_qpid_test ; {"node": {"x-declare": '
 
168
                '{"auto-delete": true, "durable": false}, "type": "topic"}, '
 
169
                '"create": "always"}')
 
170
        self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
 
171
        self.mock_sender.send(mox.IgnoreArg())
 
172
        if not server_params:
 
173
            # This is a pooled connection, so instead of closing it, it
 
174
            # gets reset, which is just creating a new session on the
 
175
            # connection.
 
176
            self.mock_session.close()
 
177
            self.mock_connection.session().AndReturn(self.mock_session)
 
178
 
 
179
        self.mox.ReplayAll()
 
180
 
 
181
        try:
 
182
            ctx = context.RequestContext("user", "project")
 
183
 
 
184
            args = [FLAGS, ctx, "impl_qpid_test",
 
185
                    {"method": "test_method", "args": {}}]
 
186
 
 
187
            if server_params:
 
188
                args.insert(2, server_params)
 
189
                if fanout:
 
190
                    method = impl_qpid.fanout_cast_to_server
 
191
                else:
 
192
                    method = impl_qpid.cast_to_server
 
193
            else:
 
194
                if fanout:
 
195
                    method = impl_qpid.fanout_cast
 
196
                else:
 
197
                    method = impl_qpid.cast
 
198
 
 
199
            method(*args)
 
200
        finally:
 
201
            while impl_qpid.Connection.pool.free_items:
 
202
                # Pull the mock connection object out of the connection pool so
 
203
                # that it doesn't mess up other test cases.
 
204
                impl_qpid.Connection.pool.get()
 
205
 
 
206
    @test.skip_if(qpid is None, "Test requires qpid")
 
207
    def test_cast(self):
 
208
        self._test_cast(fanout=False)
 
209
 
 
210
    @test.skip_if(qpid is None, "Test requires qpid")
 
211
    def test_fanout_cast(self):
 
212
        self._test_cast(fanout=True)
 
213
 
 
214
    def _setup_to_server_tests(self, server_params):
 
215
        class MyConnection(impl_qpid.Connection):
 
216
            def __init__(myself, *args, **kwargs):
 
217
                super(MyConnection, myself).__init__(*args, **kwargs)
 
218
                self.assertEqual(myself.connection.username,
 
219
                        server_params['username'])
 
220
                self.assertEqual(myself.connection.password,
 
221
                        server_params['password'])
 
222
                self.assertEqual(myself.broker,
 
223
                        server_params['hostname'] + ':' +
 
224
                                str(server_params['port']))
 
225
 
 
226
        MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
 
227
        self.stubs.Set(impl_qpid, 'Connection', MyConnection)
 
228
 
 
229
    @test.skip_if(qpid is None, "Test requires qpid")
 
230
    def test_cast_to_server(self):
 
231
        server_params = {'username': 'fake_username',
 
232
                         'password': 'fake_password',
 
233
                         'hostname': 'fake_hostname',
 
234
                         'port': 31337}
 
235
        self._setup_to_server_tests(server_params)
 
236
        self._test_cast(fanout=False, server_params=server_params)
 
237
 
 
238
    @test.skip_if(qpid is None, "Test requires qpid")
 
239
    def test_fanout_cast_to_server(self):
 
240
        server_params = {'username': 'fake_username',
 
241
                         'password': 'fake_password',
 
242
                         'hostname': 'fake_hostname',
 
243
                         'port': 31337}
 
244
        self._setup_to_server_tests(server_params)
 
245
        self._test_cast(fanout=True, server_params=server_params)
 
246
 
 
247
    def _test_call(self, multi):
 
248
        self.mock_connection = self.mox.CreateMock(self.orig_connection)
 
249
        self.mock_session = self.mox.CreateMock(self.orig_session)
 
250
        self.mock_sender = self.mox.CreateMock(self.orig_sender)
 
251
        self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
 
252
 
 
253
        self.mock_connection.opened().AndReturn(False)
 
254
        self.mock_connection.open()
 
255
        self.mock_connection.session().AndReturn(self.mock_session)
 
256
        rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
 
257
                   ' true, "durable": true, "type": "direct"}, "type": '
 
258
                   '"topic"}, "create": "always", "link": {"x-declare": '
 
259
                   '{"auto-delete": true, "exclusive": true, "durable": '
 
260
                   'false}, "durable": true, "name": ".*"}}')
 
261
        self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
 
262
        self.mock_receiver.capacity = 1
 
263
        send_addr = ('cinder/impl_qpid_test ; {"node": {"x-declare": '
 
264
            '{"auto-delete": true, "durable": false}, "type": "topic"}, '
 
265
            '"create": "always"}')
 
266
        self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
 
267
        self.mock_sender.send(mox.IgnoreArg())
 
268
 
 
269
        self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 
270
                                                        self.mock_receiver)
 
271
        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
 
272
                        {"result": "foo", "failure": False, "ending": False}))
 
273
        if multi:
 
274
            self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 
275
                                                        self.mock_receiver)
 
276
            self.mock_receiver.fetch().AndReturn(
 
277
                            qpid.messaging.Message(
 
278
                                {"result": "bar", "failure": False,
 
279
                                 "ending": False}))
 
280
            self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 
281
                                                        self.mock_receiver)
 
282
            self.mock_receiver.fetch().AndReturn(
 
283
                            qpid.messaging.Message(
 
284
                                {"result": "baz", "failure": False,
 
285
                                 "ending": False}))
 
286
        self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
 
287
                                                        self.mock_receiver)
 
288
        self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
 
289
                        {"failure": False, "ending": True}))
 
290
        self.mock_session.close()
 
291
        self.mock_connection.session().AndReturn(self.mock_session)
 
292
 
 
293
        self.mox.ReplayAll()
 
294
 
 
295
        try:
 
296
            ctx = context.RequestContext("user", "project")
 
297
 
 
298
            if multi:
 
299
                method = impl_qpid.multicall
 
300
            else:
 
301
                method = impl_qpid.call
 
302
 
 
303
            res = method(FLAGS, ctx, "impl_qpid_test",
 
304
                           {"method": "test_method", "args": {}})
 
305
 
 
306
            if multi:
 
307
                self.assertEquals(list(res), ["foo", "bar", "baz"])
 
308
            else:
 
309
                self.assertEquals(res, "foo")
 
310
        finally:
 
311
            while impl_qpid.Connection.pool.free_items:
 
312
                # Pull the mock connection object out of the connection pool so
 
313
                # that it doesn't mess up other test cases.
 
314
                impl_qpid.Connection.pool.get()
 
315
 
 
316
    @test.skip_if(qpid is None, "Test requires qpid")
 
317
    def test_call(self):
 
318
        self._test_call(multi=False)
 
319
 
 
320
    @test.skip_if(qpid is None, "Test requires qpid")
 
321
    def test_multicall(self):
 
322
        self._test_call(multi=True)
 
323
 
 
324
 
 
325
#
 
326
#from cinder.tests.rpc import common
 
327
#
 
328
# Qpid does not have a handy in-memory transport like kombu, so it's not
 
329
# terribly straight forward to take advantage of the common unit tests.
 
330
# However, at least at the time of this writing, the common unit tests all pass
 
331
# with qpidd running.
 
332
#
 
333
# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
 
334
#     def setUp(self):
 
335
#         self.rpc = impl_qpid
 
336
#         super(RpcQpidCommonTestCase, self).setUp()
 
337
#
 
338
#     def tearDown(self):
 
339
#         super(RpcQpidCommonTestCase, self).tearDown()
 
340
#