1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
6
# Copyright 2012, Red Hat, Inc.
8
# Licensed under the Apache License, Version 2.0 (the "License"); you may
9
# not use this file except in compliance with the License. You may obtain
10
# a copy of the License at
12
# http://www.apache.org/licenses/LICENSE-2.0
14
# Unless required by applicable law or agreed to in writing, software
15
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17
# License for the specific language governing permissions and limitations
20
Unit Tests for remote procedure calls using qpid
25
from cinder import context
26
from cinder import flags
27
from cinder import log as logging
28
from cinder.rpc import amqp as rpc_amqp
29
from cinder import test
33
from cinder.rpc import impl_qpid
40
LOG = logging.getLogger(__name__)
43
class RpcQpidTestCase(test.TestCase):
45
Exercise the public API of impl_qpid utilizing mox.
47
This set of tests utilizes mox to replace the Qpid objects and ensures
48
that the right operations happen on them when the various public rpc API
49
calls are exercised. The API calls tested here include:
51
cinder.rpc.create_connection()
52
cinder.rpc.common.Connection.create_consumer()
53
cinder.rpc.common.Connection.close()
55
cinder.rpc.fanout_cast()
57
cinder.rpc.multicall()
61
super(RpcQpidTestCase, self).setUp()
63
self.mock_connection = None
64
self.mock_session = None
65
self.mock_sender = None
66
self.mock_receiver = None
69
impl_qpid.register_opts(FLAGS)
70
self.orig_connection = qpid.messaging.Connection
71
self.orig_session = qpid.messaging.Session
72
self.orig_sender = qpid.messaging.Sender
73
self.orig_receiver = qpid.messaging.Receiver
74
qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
75
qpid.messaging.Session = lambda *_x, **_y: self.mock_session
76
qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
77
qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
81
qpid.messaging.Connection = self.orig_connection
82
qpid.messaging.Session = self.orig_session
83
qpid.messaging.Sender = self.orig_sender
84
qpid.messaging.Receiver = self.orig_receiver
86
# Need to reset this in case we changed the connection_cls
87
# in self._setup_to_server_tests()
88
impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
90
super(RpcQpidTestCase, self).tearDown()
92
@test.skip_if(qpid is None, "Test requires qpid")
93
def test_create_connection(self):
94
self.mock_connection = self.mox.CreateMock(self.orig_connection)
95
self.mock_session = self.mox.CreateMock(self.orig_session)
97
self.mock_connection.opened().AndReturn(False)
98
self.mock_connection.open()
99
self.mock_connection.session().AndReturn(self.mock_session)
100
self.mock_connection.close()
104
connection = impl_qpid.create_connection(FLAGS)
107
def _test_create_consumer(self, fanout):
108
self.mock_connection = self.mox.CreateMock(self.orig_connection)
109
self.mock_session = self.mox.CreateMock(self.orig_session)
110
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
112
self.mock_connection.opened().AndReturn(False)
113
self.mock_connection.open()
114
self.mock_connection.session().AndReturn(self.mock_session)
116
# The link name includes a UUID, so match it with a regex.
117
expected_address = mox.Regex(r'^impl_qpid_test_fanout ; '
118
'{"node": {"x-declare": {"auto-delete": true, "durable": '
119
'false, "type": "fanout"}, "type": "topic"}, "create": '
120
'"always", "link": {"x-declare": {"auto-delete": true, '
121
'"exclusive": true, "durable": false}, "durable": true, '
122
'"name": "impl_qpid_test_fanout_.*"}}$')
125
'cinder/impl_qpid_test ; {"node": {"x-declare": '
126
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
127
'"create": "always", "link": {"x-declare": {"auto-delete": '
128
'true, "exclusive": false, "durable": false}, "durable": '
129
'true, "name": "impl_qpid_test"}}')
130
self.mock_session.receiver(expected_address).AndReturn(
132
self.mock_receiver.capacity = 1
133
self.mock_connection.close()
137
connection = impl_qpid.create_connection(FLAGS)
138
connection.create_consumer("impl_qpid_test",
139
lambda *_x, **_y: None,
143
@test.skip_if(qpid is None, "Test requires qpid")
144
def test_create_consumer(self):
145
self._test_create_consumer(fanout=False)
147
@test.skip_if(qpid is None, "Test requires qpid")
148
def test_create_consumer_fanout(self):
149
self._test_create_consumer(fanout=True)
151
def _test_cast(self, fanout, server_params=None):
152
self.mock_connection = self.mox.CreateMock(self.orig_connection)
153
self.mock_session = self.mox.CreateMock(self.orig_session)
154
self.mock_sender = self.mox.CreateMock(self.orig_sender)
156
self.mock_connection.opened().AndReturn(False)
157
self.mock_connection.open()
159
self.mock_connection.session().AndReturn(self.mock_session)
161
expected_address = ('impl_qpid_test_fanout ; '
162
'{"node": {"x-declare": {"auto-delete": true, '
163
'"durable": false, "type": "fanout"}, '
164
'"type": "topic"}, "create": "always"}')
167
'cinder/impl_qpid_test ; {"node": {"x-declare": '
168
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
169
'"create": "always"}')
170
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
171
self.mock_sender.send(mox.IgnoreArg())
172
if not server_params:
173
# This is a pooled connection, so instead of closing it, it
174
# gets reset, which is just creating a new session on the
176
self.mock_session.close()
177
self.mock_connection.session().AndReturn(self.mock_session)
182
ctx = context.RequestContext("user", "project")
184
args = [FLAGS, ctx, "impl_qpid_test",
185
{"method": "test_method", "args": {}}]
188
args.insert(2, server_params)
190
method = impl_qpid.fanout_cast_to_server
192
method = impl_qpid.cast_to_server
195
method = impl_qpid.fanout_cast
197
method = impl_qpid.cast
201
while impl_qpid.Connection.pool.free_items:
202
# Pull the mock connection object out of the connection pool so
203
# that it doesn't mess up other test cases.
204
impl_qpid.Connection.pool.get()
206
@test.skip_if(qpid is None, "Test requires qpid")
208
self._test_cast(fanout=False)
210
@test.skip_if(qpid is None, "Test requires qpid")
211
def test_fanout_cast(self):
212
self._test_cast(fanout=True)
214
def _setup_to_server_tests(self, server_params):
215
class MyConnection(impl_qpid.Connection):
216
def __init__(myself, *args, **kwargs):
217
super(MyConnection, myself).__init__(*args, **kwargs)
218
self.assertEqual(myself.connection.username,
219
server_params['username'])
220
self.assertEqual(myself.connection.password,
221
server_params['password'])
222
self.assertEqual(myself.broker,
223
server_params['hostname'] + ':' +
224
str(server_params['port']))
226
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
227
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
229
@test.skip_if(qpid is None, "Test requires qpid")
230
def test_cast_to_server(self):
231
server_params = {'username': 'fake_username',
232
'password': 'fake_password',
233
'hostname': 'fake_hostname',
235
self._setup_to_server_tests(server_params)
236
self._test_cast(fanout=False, server_params=server_params)
238
@test.skip_if(qpid is None, "Test requires qpid")
239
def test_fanout_cast_to_server(self):
240
server_params = {'username': 'fake_username',
241
'password': 'fake_password',
242
'hostname': 'fake_hostname',
244
self._setup_to_server_tests(server_params)
245
self._test_cast(fanout=True, server_params=server_params)
247
def _test_call(self, multi):
248
self.mock_connection = self.mox.CreateMock(self.orig_connection)
249
self.mock_session = self.mox.CreateMock(self.orig_session)
250
self.mock_sender = self.mox.CreateMock(self.orig_sender)
251
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
253
self.mock_connection.opened().AndReturn(False)
254
self.mock_connection.open()
255
self.mock_connection.session().AndReturn(self.mock_session)
256
rcv_addr = mox.Regex(r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
257
' true, "durable": true, "type": "direct"}, "type": '
258
'"topic"}, "create": "always", "link": {"x-declare": '
259
'{"auto-delete": true, "exclusive": true, "durable": '
260
'false}, "durable": true, "name": ".*"}}')
261
self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
262
self.mock_receiver.capacity = 1
263
send_addr = ('cinder/impl_qpid_test ; {"node": {"x-declare": '
264
'{"auto-delete": true, "durable": false}, "type": "topic"}, '
265
'"create": "always"}')
266
self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
267
self.mock_sender.send(mox.IgnoreArg())
269
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
271
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
272
{"result": "foo", "failure": False, "ending": False}))
274
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
276
self.mock_receiver.fetch().AndReturn(
277
qpid.messaging.Message(
278
{"result": "bar", "failure": False,
280
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
282
self.mock_receiver.fetch().AndReturn(
283
qpid.messaging.Message(
284
{"result": "baz", "failure": False,
286
self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
288
self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
289
{"failure": False, "ending": True}))
290
self.mock_session.close()
291
self.mock_connection.session().AndReturn(self.mock_session)
296
ctx = context.RequestContext("user", "project")
299
method = impl_qpid.multicall
301
method = impl_qpid.call
303
res = method(FLAGS, ctx, "impl_qpid_test",
304
{"method": "test_method", "args": {}})
307
self.assertEquals(list(res), ["foo", "bar", "baz"])
309
self.assertEquals(res, "foo")
311
while impl_qpid.Connection.pool.free_items:
312
# Pull the mock connection object out of the connection pool so
313
# that it doesn't mess up other test cases.
314
impl_qpid.Connection.pool.get()
316
@test.skip_if(qpid is None, "Test requires qpid")
318
self._test_call(multi=False)
320
@test.skip_if(qpid is None, "Test requires qpid")
321
def test_multicall(self):
322
self._test_call(multi=True)
326
#from cinder.tests.rpc import common
328
# Qpid does not have a handy in-memory transport like kombu, so it's not
329
# terribly straight forward to take advantage of the common unit tests.
330
# However, at least at the time of this writing, the common unit tests all pass
331
# with qpidd running.
333
# class RpcQpidCommonTestCase(common._BaseRpcTestCase):
335
# self.rpc = impl_qpid
336
# super(RpcQpidCommonTestCase, self).setUp()
338
# def tearDown(self):
339
# super(RpcQpidCommonTestCase, self).tearDown()