~ubuntu-branches/ubuntu/trusty/cinder/trusty

« back to all changes in this revision

Viewing changes to cinder/tests/rpc/common.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-22 09:57:46 UTC
  • Revision ID: package-import@ubuntu.com-20120522095746-9lm71yvzltjybk4b
Tags: upstream-2012.2~f1~20120503.2
ImportĀ upstreamĀ versionĀ 2012.2~f1~20120503.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2010 United States Government as represented by the
 
4
# Administrator of the National Aeronautics and Space Administration.
 
5
# All Rights Reserved.
 
6
#
 
7
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
8
#    not use this file except in compliance with the License. You may obtain
 
9
#    a copy of the License at
 
10
#
 
11
#         http://www.apache.org/licenses/LICENSE-2.0
 
12
#
 
13
#    Unless required by applicable law or agreed to in writing, software
 
14
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
15
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
16
#    License for the specific language governing permissions and limitations
 
17
#    under the License.
 
18
"""
 
19
Unit Tests for remote procedure calls shared between all implementations
 
20
"""
 
21
 
 
22
import time
 
23
 
 
24
from eventlet import greenthread
 
25
import nose
 
26
 
 
27
from cinder import context
 
28
from cinder import exception
 
29
from cinder import flags
 
30
from cinder import log as logging
 
31
from cinder.rpc import amqp as rpc_amqp
 
32
from cinder.rpc import common as rpc_common
 
33
from cinder import test
 
34
 
 
35
 
 
36
FLAGS = flags.FLAGS
 
37
LOG = logging.getLogger(__name__)
 
38
 
 
39
 
 
40
class BaseRpcTestCase(test.TestCase):
 
41
    def setUp(self, supports_timeouts=True):
 
42
        super(BaseRpcTestCase, self).setUp()
 
43
        self.conn = self.rpc.create_connection(FLAGS, True)
 
44
        self.receiver = TestReceiver()
 
45
        self.conn.create_consumer('test', self.receiver, False)
 
46
        self.conn.consume_in_thread()
 
47
        self.context = context.get_admin_context()
 
48
        self.supports_timeouts = supports_timeouts
 
49
 
 
50
    def tearDown(self):
 
51
        self.conn.close()
 
52
        super(BaseRpcTestCase, self).tearDown()
 
53
 
 
54
    def test_call_succeed(self):
 
55
        value = 42
 
56
        result = self.rpc.call(FLAGS, self.context, 'test',
 
57
                               {"method": "echo", "args": {"value": value}})
 
58
        self.assertEqual(value, result)
 
59
 
 
60
    def test_call_succeed_despite_multiple_returns_yield(self):
 
61
        value = 42
 
62
        result = self.rpc.call(FLAGS, self.context, 'test',
 
63
                          {"method": "echo_three_times_yield",
 
64
                           "args": {"value": value}})
 
65
        self.assertEqual(value + 2, result)
 
66
 
 
67
    def test_multicall_succeed_once(self):
 
68
        value = 42
 
69
        result = self.rpc.multicall(FLAGS, self.context,
 
70
                              'test',
 
71
                              {"method": "echo",
 
72
                               "args": {"value": value}})
 
73
        for i, x in enumerate(result):
 
74
            if i > 0:
 
75
                self.fail('should only receive one response')
 
76
            self.assertEqual(value + i, x)
 
77
 
 
78
    def test_multicall_three_nones(self):
 
79
        value = 42
 
80
        result = self.rpc.multicall(FLAGS, self.context,
 
81
                              'test',
 
82
                              {"method": "multicall_three_nones",
 
83
                               "args": {"value": value}})
 
84
        for i, x in enumerate(result):
 
85
            self.assertEqual(x, None)
 
86
        # i should have been 0, 1, and finally 2:
 
87
        self.assertEqual(i, 2)
 
88
 
 
89
    def test_multicall_succeed_three_times_yield(self):
 
90
        value = 42
 
91
        result = self.rpc.multicall(FLAGS, self.context,
 
92
                              'test',
 
93
                              {"method": "echo_three_times_yield",
 
94
                               "args": {"value": value}})
 
95
        for i, x in enumerate(result):
 
96
            self.assertEqual(value + i, x)
 
97
 
 
98
    def test_context_passed(self):
 
99
        """Makes sure a context is passed through rpc call."""
 
100
        value = 42
 
101
        result = self.rpc.call(FLAGS, self.context,
 
102
                          'test', {"method": "context",
 
103
                                   "args": {"value": value}})
 
104
        self.assertEqual(self.context.to_dict(), result)
 
105
 
 
106
    def test_nested_calls(self):
 
107
        """Test that we can do an rpc.call inside another call."""
 
108
        class Nested(object):
 
109
            @staticmethod
 
110
            def echo(context, queue, value):
 
111
                """Calls echo in the passed queue"""
 
112
                LOG.debug(_("Nested received %(queue)s, %(value)s")
 
113
                        % locals())
 
114
                # TODO(comstud):
 
115
                # so, it will replay the context and use the same REQID?
 
116
                # that's bizarre.
 
117
                ret = self.rpc.call(FLAGS, context,
 
118
                               queue,
 
119
                               {"method": "echo",
 
120
                                "args": {"value": value}})
 
121
                LOG.debug(_("Nested return %s"), ret)
 
122
                return value
 
123
 
 
124
        nested = Nested()
 
125
        conn = self.rpc.create_connection(FLAGS, True)
 
126
        conn.create_consumer('nested', nested, False)
 
127
        conn.consume_in_thread()
 
128
        value = 42
 
129
        result = self.rpc.call(FLAGS, self.context,
 
130
                          'nested', {"method": "echo",
 
131
                                     "args": {"queue": "test",
 
132
                                              "value": value}})
 
133
        conn.close()
 
134
        self.assertEqual(value, result)
 
135
 
 
136
    def test_call_timeout(self):
 
137
        """Make sure rpc.call will time out"""
 
138
        if not self.supports_timeouts:
 
139
            raise nose.SkipTest(_("RPC backend does not support timeouts"))
 
140
 
 
141
        value = 42
 
142
        self.assertRaises(rpc_common.Timeout,
 
143
                          self.rpc.call,
 
144
                          FLAGS, self.context,
 
145
                          'test',
 
146
                          {"method": "block",
 
147
                           "args": {"value": value}}, timeout=1)
 
148
        try:
 
149
            self.rpc.call(FLAGS, self.context,
 
150
                     'test',
 
151
                     {"method": "block",
 
152
                      "args": {"value": value}},
 
153
                     timeout=1)
 
154
            self.fail("should have thrown Timeout")
 
155
        except rpc_common.Timeout as exc:
 
156
            pass
 
157
 
 
158
 
 
159
class BaseRpcAMQPTestCase(BaseRpcTestCase):
 
160
    """Base test class for all AMQP-based RPC tests"""
 
161
    def test_proxycallback_handles_exceptions(self):
 
162
        """Make sure exceptions unpacking messages don't cause hangs."""
 
163
        orig_unpack = rpc_amqp.unpack_context
 
164
 
 
165
        info = {'unpacked': False}
 
166
 
 
167
        def fake_unpack_context(*args, **kwargs):
 
168
            info['unpacked'] = True
 
169
            raise test.TestingException('moo')
 
170
 
 
171
        self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
 
172
 
 
173
        value = 41
 
174
        self.rpc.cast(FLAGS, self.context, 'test',
 
175
                      {"method": "echo", "args": {"value": value}})
 
176
 
 
177
        # Wait for the cast to complete.
 
178
        for x in xrange(50):
 
179
            if info['unpacked']:
 
180
                break
 
181
            greenthread.sleep(0.1)
 
182
        else:
 
183
            self.fail("Timeout waiting for message to be consued")
 
184
 
 
185
        # Now see if we get a response even though we raised an
 
186
        # exception for the cast above.
 
187
        self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
 
188
 
 
189
        value = 42
 
190
        result = self.rpc.call(FLAGS, self.context, 'test',
 
191
                {"method": "echo",
 
192
                 "args": {"value": value}})
 
193
        self.assertEqual(value, result)
 
194
 
 
195
 
 
196
class TestReceiver(object):
 
197
    """Simple Proxy class so the consumer has methods to call.
 
198
 
 
199
    Uses static methods because we aren't actually storing any state.
 
200
 
 
201
    """
 
202
 
 
203
    @staticmethod
 
204
    def echo(context, value):
 
205
        """Simply returns whatever value is sent in."""
 
206
        LOG.debug(_("Received %s"), value)
 
207
        return value
 
208
 
 
209
    @staticmethod
 
210
    def context(context, value):
 
211
        """Returns dictionary version of context."""
 
212
        LOG.debug(_("Received %s"), context)
 
213
        return context.to_dict()
 
214
 
 
215
    @staticmethod
 
216
    def multicall_three_nones(context, value):
 
217
        yield None
 
218
        yield None
 
219
        yield None
 
220
 
 
221
    @staticmethod
 
222
    def echo_three_times_yield(context, value):
 
223
        yield value
 
224
        yield value + 1
 
225
        yield value + 2
 
226
 
 
227
    @staticmethod
 
228
    def fail(context, value):
 
229
        """Raises an exception with the value sent in."""
 
230
        raise NotImplementedError(value)
 
231
 
 
232
    @staticmethod
 
233
    def fail_converted(context, value):
 
234
        """Raises an exception with the value sent in."""
 
235
        raise exception.ConvertedException(explanation=value)
 
236
 
 
237
    @staticmethod
 
238
    def block(context, value):
 
239
        time.sleep(2)