1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
19
Unit Tests for remote procedure calls shared between all implementations
24
from eventlet import greenthread
27
from cinder import context
28
from cinder import exception
29
from cinder import flags
30
from cinder import log as logging
31
from cinder.rpc import amqp as rpc_amqp
32
from cinder.rpc import common as rpc_common
33
from cinder import test
37
LOG = logging.getLogger(__name__)
40
class BaseRpcTestCase(test.TestCase):
41
def setUp(self, supports_timeouts=True):
42
super(BaseRpcTestCase, self).setUp()
43
self.conn = self.rpc.create_connection(FLAGS, True)
44
self.receiver = TestReceiver()
45
self.conn.create_consumer('test', self.receiver, False)
46
self.conn.consume_in_thread()
47
self.context = context.get_admin_context()
48
self.supports_timeouts = supports_timeouts
52
super(BaseRpcTestCase, self).tearDown()
54
def test_call_succeed(self):
56
result = self.rpc.call(FLAGS, self.context, 'test',
57
{"method": "echo", "args": {"value": value}})
58
self.assertEqual(value, result)
60
def test_call_succeed_despite_multiple_returns_yield(self):
62
result = self.rpc.call(FLAGS, self.context, 'test',
63
{"method": "echo_three_times_yield",
64
"args": {"value": value}})
65
self.assertEqual(value + 2, result)
67
def test_multicall_succeed_once(self):
69
result = self.rpc.multicall(FLAGS, self.context,
72
"args": {"value": value}})
73
for i, x in enumerate(result):
75
self.fail('should only receive one response')
76
self.assertEqual(value + i, x)
78
def test_multicall_three_nones(self):
80
result = self.rpc.multicall(FLAGS, self.context,
82
{"method": "multicall_three_nones",
83
"args": {"value": value}})
84
for i, x in enumerate(result):
85
self.assertEqual(x, None)
86
# i should have been 0, 1, and finally 2:
87
self.assertEqual(i, 2)
89
def test_multicall_succeed_three_times_yield(self):
91
result = self.rpc.multicall(FLAGS, self.context,
93
{"method": "echo_three_times_yield",
94
"args": {"value": value}})
95
for i, x in enumerate(result):
96
self.assertEqual(value + i, x)
98
def test_context_passed(self):
99
"""Makes sure a context is passed through rpc call."""
101
result = self.rpc.call(FLAGS, self.context,
102
'test', {"method": "context",
103
"args": {"value": value}})
104
self.assertEqual(self.context.to_dict(), result)
106
def test_nested_calls(self):
107
"""Test that we can do an rpc.call inside another call."""
108
class Nested(object):
110
def echo(context, queue, value):
111
"""Calls echo in the passed queue"""
112
LOG.debug(_("Nested received %(queue)s, %(value)s")
115
# so, it will replay the context and use the same REQID?
117
ret = self.rpc.call(FLAGS, context,
120
"args": {"value": value}})
121
LOG.debug(_("Nested return %s"), ret)
125
conn = self.rpc.create_connection(FLAGS, True)
126
conn.create_consumer('nested', nested, False)
127
conn.consume_in_thread()
129
result = self.rpc.call(FLAGS, self.context,
130
'nested', {"method": "echo",
131
"args": {"queue": "test",
134
self.assertEqual(value, result)
136
def test_call_timeout(self):
137
"""Make sure rpc.call will time out"""
138
if not self.supports_timeouts:
139
raise nose.SkipTest(_("RPC backend does not support timeouts"))
142
self.assertRaises(rpc_common.Timeout,
147
"args": {"value": value}}, timeout=1)
149
self.rpc.call(FLAGS, self.context,
152
"args": {"value": value}},
154
self.fail("should have thrown Timeout")
155
except rpc_common.Timeout as exc:
159
class BaseRpcAMQPTestCase(BaseRpcTestCase):
160
"""Base test class for all AMQP-based RPC tests"""
161
def test_proxycallback_handles_exceptions(self):
162
"""Make sure exceptions unpacking messages don't cause hangs."""
163
orig_unpack = rpc_amqp.unpack_context
165
info = {'unpacked': False}
167
def fake_unpack_context(*args, **kwargs):
168
info['unpacked'] = True
169
raise test.TestingException('moo')
171
self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
174
self.rpc.cast(FLAGS, self.context, 'test',
175
{"method": "echo", "args": {"value": value}})
177
# Wait for the cast to complete.
181
greenthread.sleep(0.1)
183
self.fail("Timeout waiting for message to be consued")
185
# Now see if we get a response even though we raised an
186
# exception for the cast above.
187
self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
190
result = self.rpc.call(FLAGS, self.context, 'test',
192
"args": {"value": value}})
193
self.assertEqual(value, result)
196
class TestReceiver(object):
197
"""Simple Proxy class so the consumer has methods to call.
199
Uses static methods because we aren't actually storing any state.
204
def echo(context, value):
205
"""Simply returns whatever value is sent in."""
206
LOG.debug(_("Received %s"), value)
210
def context(context, value):
211
"""Returns dictionary version of context."""
212
LOG.debug(_("Received %s"), context)
213
return context.to_dict()
216
def multicall_three_nones(context, value):
222
def echo_three_times_yield(context, value):
228
def fail(context, value):
229
"""Raises an exception with the value sent in."""
230
raise NotImplementedError(value)
233
def fail_converted(context, value):
234
"""Raises an exception with the value sent in."""
235
raise exception.ConvertedException(explanation=value)
238
def block(context, value):