1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
6
# Copyright 2012, Red Hat, Inc.
8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
9
# not use this file except in compliance with the License. You may obtain
10
# a copy of the License at
12
# http://www.apache.org/licenses/LICENSE-2.0
14
# Unless required by applicable law or agreed to in writing, software
15
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
# License for the specific language governing permissions and limitations
20
Unit Tests for remote procedure calls using qpid
25
from nova import context
26
from nova import flags
27
from nova import log as logging
28
from nova.rpc import amqp as rpc_amqp
32
from nova.rpc import impl_qpid
40
LOG = logging.getLogger(__name__)
43
class RpcQpidTestCase(test.TestCase):
45
Exercise the public API of impl_qpid utilizing mox.
47
This set of tests utilizes mox to replace the Qpid objects and ensures
48
that the right operations happen on them when the various public rpc API
49
calls are exercised. The API calls tested here include:
51
nova.rpc.create_connection()
52
nova.rpc.common.Connection.create_consumer()
53
nova.rpc.common.Connection.close()
55
nova.rpc.fanout_cast()
61
super(RpcQpidTestCase, self).setUp()
63
self.mock_connection = None
64
self.mock_session = None
65
self.mock_sender = None
66
self.mock_receiver = None
69
impl_qpid.register_opts(FLAGS)
70
self.orig_connection = qpid.messaging.Connection
71
self.orig_session = qpid.messaging.Session
72
self.orig_sender = qpid.messaging.Sender
73
self.orig_receiver = qpid.messaging.Receiver
74
qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
75
qpid.messaging.Session = lambda *_x, **_y: self.mock_session
76
qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
77
qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
81
qpid.messaging.Connection = self.orig_connection
82
qpid.messaging.Session = self.orig_session
83
qpid.messaging.Sender = self.orig_sender
84
qpid.messaging.Receiver = self.orig_receiver
86
# Need to reset this in case we changed the connection_cls
87
# in self._setup_to_server_tests()
88
impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
90
super(RpcQpidTestCase, self).tearDown()
92
@test.skip_if(qpid is None, "Test requires qpid")
93
def test_create_connection(self):
94
self.mock_connection = self.mox.CreateMock(self.orig_connection)
95
self.mock_session = self.mox.CreateMock(self.orig_session)
97
self.mock_connection.opened().AndReturn(False)
98
self.mock_connection.open()
99
self.mock_connection.session().AndReturn(self.mock_session)
100
self.mock_connection.close()
104
connection = impl_qpid.create_connection(FLAGS)
107
def _test_create_consumer(self, fanout):
108
self.mock_connection = self.mox.CreateMock(self.orig_connection)
109
self.mock_session = self.mox.CreateMock(self.orig_session)
110
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
112
self.mock_connection.opened().AndReturn(False)
113
self.mock_connection.open()
114
self.mock_connection.session().AndReturn(self.mock_session)
116
# The link name includes a UUID, so match it with a regex.
117
expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
118
'{"node": {"x-declare": {"auto-delete": true, "durable": '
119
'false, "type": "fanout"}, "type": "topic"}, "create": '
120
'"always", "link": {"x-declare": {"auto-delete": true, '
121
'"exclusive": true, "durable": false}, "durable": true, '
122
'"name": "impl_qpid_test_fanout_.*"}}$')
124
expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
125
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
126
'"create": "always", "link": {"x-declare": {"auto-delete": '
127
'true, "exclusive": false, "durable": false}, "durable": '
128
'true, "name": "impl_qpid_test"}}')
129
self.mock_session.receiver(expected_address).AndReturn(
131
self.mock_receiver.capacity = 1
132
self.mock_connection.close()
136
connection = impl_qpid.create_connection(FLAGS)
137
connection.create_consumer("impl_qpid_test",
138
lambda *_x, **_y: None,
142
@test.skip_if(qpid is None, "Test requires qpid")
143
def test_create_consumer(self):
144
self._test_create_consumer(fanout=False)
146
@test.skip_if(qpid is None, "Test requires qpid")
147
def test_create_consumer_fanout(self):
148
self._test_create_consumer(fanout=True)
150
@test.skip_if(qpid is None, "Test requires qpid")
151
def test_create_worker(self):
152
self.mock_connection = self.mox.CreateMock(self.orig_connection)
153
self.mock_session = self.mox.CreateMock(self.orig_session)
154
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
156
self.mock_connection.opened().AndReturn(False)
157
self.mock_connection.open()
158
self.mock_connection.session().AndReturn(self.mock_session)
160
'nova/impl_qpid_test ; {"node": {"x-declare": '
161
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
162
'"create": "always", "link": {"x-declare": {"auto-delete": '
163
'true, "exclusive": false, "durable": false}, "durable": '
164
'true, "name": "impl.qpid.test.workers"}}')
165
self.mock_session.receiver(expected_address).AndReturn(
167
self.mock_receiver.capacity = 1
168
self.mock_connection.close()
172
connection = impl_qpid.create_connection(FLAGS)
173
connection.create_worker("impl_qpid_test",
174
lambda *_x, **_y: None,
175
'impl.qpid.test.workers',
179
def _test_cast(self, fanout, server_params=None):
180
self.mock_connection = self.mox.CreateMock(self.orig_connection)
181
self.mock_session = self.mox.CreateMock(self.orig_session)
182
self.mock_sender = self.mox.CreateMock(self.orig_sender)
184
self.mock_connection.opened().AndReturn(False)
185
self.mock_connection.open()
187
self.mock_connection.session().AndReturn(self.mock_session)
189
expected_address = ('impl_qpid_test_fanout ; '
190
'{"node": {"x-declare": {"auto-delete": true, '
191
'"durable": false, "type": "fanout"}, '
192
'"type": "topic"}, "create": "always"}')
194
expected_address = ('nova/impl_qpid_test ; {"node": {"x-declare": '
195
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
196
'"create": "always"}')
197
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
198
self.mock_sender.send(mox.IgnoreArg())
199
if not server_params:
200
# This is a pooled connection, so instead of closing it, it
201
# gets reset, which is just creating a new session on the
203
self.mock_session.close()
204
self.mock_connection.session().AndReturn(self.mock_session)
209
ctx = context.RequestContext("user", "project")
211
args = [FLAGS, ctx, "impl_qpid_test",
212
{"method": "test_method", "args": {}}]
215
args.insert(2, server_params)
217
method = impl_qpid.fanout_cast_to_server
219
method = impl_qpid.cast_to_server
222
method = impl_qpid.fanout_cast
224
method = impl_qpid.cast
228
while impl_qpid.Connection.pool.free_items:
229
# Pull the mock connection object out of the connection pool so
230
# that it doesn't mess up other test cases.
231
impl_qpid.Connection.pool.get()
233
@test.skip_if(qpid is None, "Test requires qpid")
235
self._test_cast(fanout=False)
237
@test.skip_if(qpid is None, "Test requires qpid")
238
def test_fanout_cast(self):
239
self._test_cast(fanout=True)
241
def _setup_to_server_tests(self, server_params):
242
class MyConnection(impl_qpid.Connection):
243
def __init__(myself, *args, **kwargs):
244
super(MyConnection, myself).__init__(*args, **kwargs)
245
self.assertEqual(myself.connection.username,
246
server_params['username'])
247
self.assertEqual(myself.connection.password,
248
server_params['password'])
249
self.assertEqual(myself.broker,
250
server_params['hostname'] + ':' +
251
str(server_params['port']))
253
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
254
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
256
@test.skip_if(qpid is None, "Test requires qpid")
257
def test_cast_to_server(self):
258
server_params = {'username': 'fake_username',
259
'password': 'fake_password',
260
'hostname': 'fake_hostname',
262
self._setup_to_server_tests(server_params)
263
self._test_cast(fanout=False, server_params=server_params)
265
@test.skip_if(qpid is None, "Test requires qpid")
266
def test_fanout_cast_to_server(self):
267
server_params = {'username': 'fake_username',
268
'password': 'fake_password',
269
'hostname': 'fake_hostname',
271
self._setup_to_server_tests(server_params)
272
self._test_cast(fanout=True, server_params=server_params)
274
def _test_call(self, multi):
275
self.mock_connection = self.mox.CreateMock(self.orig_connection)
276
self.mock_session = self.mox.CreateMock(self.orig_session)
277
self.mock_sender = self.mox.CreateMock(self.orig_sender)
278
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
280
self.mock_connection.opened().AndReturn(False)
281
self.mock_connection.open()
282
self.mock_connection.session().AndReturn(self.mock_session)
283
rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
284
' true, "durable": true, "type": "direct"}, "type": '
285
'"topic"}, "create": "always", "link": {"x-declare": '
286
'{"auto-delete": true, "exclusive": true, "durable": '
287
'false}, "durable": true, "name": ".*"}}')
288
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
289
self.mock_receiver.capacity = 1
290
send_addr = ('nova/impl_qpid_test ; {"node": {"x-declare": '
291
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
292
'"create": "always"}')
293
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
294
self.mock_sender.send(mox.IgnoreArg())
296
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
298
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
299
{"result": "foo", "failure": False, "ending": False}))
301
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
303
self.mock_receiver.fetch().AndReturn(
304
qpid.messaging.Message(
305
{"result": "bar", "failure": False,
307
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
309
self.mock_receiver.fetch().AndReturn(
310
qpid.messaging.Message(
311
{"result": "baz", "failure": False,
313
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
315
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
316
{"failure": False, "ending": True}))
317
self.mock_session.close()
318
self.mock_connection.session().AndReturn(self.mock_session)
323
ctx = context.RequestContext("user", "project")
326
method = impl_qpid.multicall
328
method = impl_qpid.call
330
res = method(FLAGS, ctx, "impl_qpid_test",
331
{"method": "test_method", "args": {}})
334
self.assertEquals(list(res), ["foo", "bar", "baz"])
336
self.assertEquals(res, "foo")
338
while impl_qpid.Connection.pool.free_items:
339
# Pull the mock connection object out of the connection pool so
340
# that it doesn't mess up other test cases.
341
impl_qpid.Connection.pool.get()
343
@test.skip_if(qpid is None, "Test requires qpid")
345
self._test_call(multi=False)
347
@test.skip_if(qpid is None, "Test requires qpid")
348
def test_multicall(self):
349
self._test_call(multi=True)
353
#from nova.tests.rpc import common
355
# Qpid does not have a handy in-memory transport like kombu, so it's not
356
# terribly straight forward to take advantage of the common unit tests.
357
# However, at least at the time of this writing, the common unit tests all pass
358
# with qpidd running.
360
# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
362
# self.rpc = impl_qpid
363
# super(RpcQpidCommonTestCase, self).setUp()
365
# def tearDown(self):
366
# super(RpcQpidCommonTestCase, self).tearDown()