1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2010 United States Government as represented by the
4
# Administrator of the National Aeronautics and Space Administration.
7
# Licensed under the Apache License, Version 2.0 (the "License"); you may
8
# not use this file except in compliance with the License. You may obtain
9
# a copy of the License at
11
# http://www.apache.org/licenses/LICENSE-2.0
13
# Unless required by applicable law or agreed to in writing, software
14
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
# License for the specific language governing permissions and limitations
19
Unit Tests for remote procedure calls shared between all implementations
25
from eventlet import greenthread
28
from nova import context
29
from nova import exception
30
from nova import flags
31
from nova import log as logging
32
from nova.rpc import amqp as rpc_amqp
33
from nova.rpc import common as rpc_common
34
from nova.rpc import dispatcher as rpc_dispatcher
39
LOG = logging.getLogger(__name__)
42
class BaseRpcTestCase(test.TestCase):
43
def setUp(self, supports_timeouts=True, topic='test',
44
topic_nested='nested'):
45
super(BaseRpcTestCase, self).setUp()
46
self.topic = topic or self.topic
47
self.topic_nested = topic_nested or self.topic_nested
48
self.supports_timeouts = supports_timeouts
49
self.context = context.get_admin_context()
52
receiver = TestReceiver()
53
self.conn = self._create_consumer(receiver, self.topic)
58
super(BaseRpcTestCase, self).tearDown()
60
def _create_consumer(self, proxy, topic, fanout=False):
61
dispatcher = rpc_dispatcher.RpcDispatcher([proxy])
62
conn = self.rpc.create_connection(FLAGS, True)
63
conn.create_consumer(topic, dispatcher, fanout)
64
conn.consume_in_thread()
67
def test_call_succeed(self):
69
raise nose.SkipTest('rpc driver not available.')
72
result = self.rpc.call(FLAGS, self.context, self.topic,
73
{"method": "echo", "args": {"value": value}})
74
self.assertEqual(value, result)
76
def test_call_succeed_despite_multiple_returns_yield(self):
78
raise nose.SkipTest('rpc driver not available.')
81
result = self.rpc.call(FLAGS, self.context, self.topic,
82
{"method": "echo_three_times_yield",
83
"args": {"value": value}})
84
self.assertEqual(value + 2, result)
86
def test_multicall_succeed_once(self):
88
raise nose.SkipTest('rpc driver not available.')
91
result = self.rpc.multicall(FLAGS, self.context,
94
"args": {"value": value}})
95
for i, x in enumerate(result):
97
self.fail('should only receive one response')
98
self.assertEqual(value + i, x)
100
def test_multicall_three_nones(self):
102
raise nose.SkipTest('rpc driver not available.')
105
result = self.rpc.multicall(FLAGS, self.context,
107
{"method": "multicall_three_nones",
108
"args": {"value": value}})
109
for i, x in enumerate(result):
110
self.assertEqual(x, None)
111
# i should have been 0, 1, and finally 2:
112
self.assertEqual(i, 2)
114
def test_multicall_succeed_three_times_yield(self):
116
raise nose.SkipTest('rpc driver not available.')
119
result = self.rpc.multicall(FLAGS, self.context,
121
{"method": "echo_three_times_yield",
122
"args": {"value": value}})
123
for i, x in enumerate(result):
124
self.assertEqual(value + i, x)
126
def test_context_passed(self):
128
raise nose.SkipTest('rpc driver not available.')
130
"""Makes sure a context is passed through rpc call."""
132
result = self.rpc.call(FLAGS, self.context,
133
self.topic, {"method": "context",
134
"args": {"value": value}})
135
self.assertEqual(self.context.to_dict(), result)
137
def _test_cast(self, fanout=False):
138
"""Test casts by pushing items through a channeled queue."""
140
# Not a true global, but capitalized so
141
# it is clear it is leaking scope into Nested()
142
QUEUE = eventlet.queue.Queue()
145
raise nose.SkipTest('rpc driver not available.')
147
# We use the nested topic so we don't need QUEUE to be a proper
148
# global, and do not keep state outside this test.
149
class Nested(object):
151
def put_queue(context, value):
152
LOG.debug("Got value in put_queue: %s", value)
156
conn = self._create_consumer(nested, self.topic_nested, fanout)
159
method = (self.rpc.cast, self.rpc.fanout_cast)[fanout]
160
method(FLAGS, self.context,
162
{"method": "put_queue",
163
"args": {"value": value}})
166
# If it does not succeed in 2 seconds, give up and assume
168
result = QUEUE.get(True, 2)
170
self.assertEqual(value, None)
173
self.assertEqual(value, result)
175
def test_cast_success(self):
176
self._test_cast(False)
178
def test_fanout_success(self):
179
self._test_cast(True)
181
def test_nested_calls(self):
183
raise nose.SkipTest('rpc driver not available.')
185
"""Test that we can do an rpc.call inside another call."""
186
class Nested(object):
188
def echo(context, queue, value):
189
"""Calls echo in the passed queue."""
190
LOG.debug(_("Nested received %(queue)s, %(value)s")
193
# so, it will replay the context and use the same REQID?
195
ret = self.rpc.call(FLAGS, context,
198
"args": {"value": value}})
199
LOG.debug(_("Nested return %s"), ret)
203
conn = self._create_consumer(nested, self.topic_nested)
206
result = self.rpc.call(FLAGS, self.context,
209
"args": {"queue": "test", "value": value}})
211
self.assertEqual(value, result)
213
def test_call_timeout(self):
215
raise nose.SkipTest('rpc driver not available.')
217
"""Make sure rpc.call will time out."""
218
if not self.supports_timeouts:
219
raise nose.SkipTest(_("RPC backend does not support timeouts"))
222
self.assertRaises(rpc_common.Timeout,
227
"args": {"value": value}}, timeout=1)
229
self.rpc.call(FLAGS, self.context,
232
"args": {"value": value}},
234
self.fail("should have thrown Timeout")
235
except rpc_common.Timeout as exc:
239
class BaseRpcAMQPTestCase(BaseRpcTestCase):
240
"""Base test class for all AMQP-based RPC tests."""
241
def test_proxycallback_handles_exceptions(self):
242
"""Make sure exceptions unpacking messages don't cause hangs."""
244
raise nose.SkipTest('rpc driver not available.')
246
orig_unpack = rpc_amqp.unpack_context
248
info = {'unpacked': False}
250
def fake_unpack_context(*args, **kwargs):
251
info['unpacked'] = True
252
raise test.TestingException('moo')
254
self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
257
self.rpc.cast(FLAGS, self.context, self.topic,
258
{"method": "echo", "args": {"value": value}})
260
# Wait for the cast to complete.
264
greenthread.sleep(0.1)
266
self.fail("Timeout waiting for message to be consumed")
268
# Now see if we get a response even though we raised an
269
# exception for the cast above.
270
self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
273
result = self.rpc.call(FLAGS, self.context, self.topic,
275
"args": {"value": value}})
276
self.assertEqual(value, result)
279
class TestReceiver(object):
280
"""Simple Proxy class so the consumer has methods to call.
282
Uses static methods because we aren't actually storing any state.
286
def echo(context, value):
287
"""Simply returns whatever value is sent in."""
288
LOG.debug(_("Received %s"), value)
292
def context(context, value):
293
"""Returns dictionary version of context."""
294
LOG.debug(_("Received %s"), context)
295
return context.to_dict()
298
def multicall_three_nones(context, value):
304
def echo_three_times_yield(context, value):
310
def fail(context, value):
311
"""Raises an exception with the value sent in."""
312
raise NotImplementedError(value)
315
def fail_converted(context, value):
316
"""Raises an exception with the value sent in."""
317
raise exception.ConvertedException(explanation=value)
320
def block(context, value):