~ubuntu-cloud-archive/ubuntu/precise/nova/trunk

« back to all changes in this revision

Viewing changes to nova/tests/rpc/common.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, Adam Gandelman
  • Date: 2012-06-22 12:39:57 UTC
  • mfrom: (1.1.57)
  • Revision ID: package-import@ubuntu.com-20120622123957-hbzwg84nt9rqwg8r
Tags: 2012.2~f2~20120621.14517-0ubuntu1
[ Chuck Short ]
* New upstream version.

[ Adam Gandelman ]
* debian/rules: Temporarily disable test suite while blocking
  tests are investigated. 
* debian/patches/kombu_tests_timeout.patch: Dropped.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
#
7
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
8
 
#    not use this file except in compliance with the License. You may obtain
9
 
#    a copy of the License at
10
 
#
11
 
#         http://www.apache.org/licenses/LICENSE-2.0
12
 
#
13
 
#    Unless required by applicable law or agreed to in writing, software
14
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
 
#    License for the specific language governing permissions and limitations
17
 
#    under the License.
18
 
"""
19
 
Unit Tests for remote procedure calls shared between all implementations
20
 
"""
21
 
 
22
 
import time
23
 
 
24
 
import eventlet
25
 
from eventlet import greenthread
26
 
import nose
27
 
 
28
 
from nova import context
29
 
from nova import exception
30
 
from nova import flags
31
 
from nova import log as logging
32
 
from nova.rpc import amqp as rpc_amqp
33
 
from nova.rpc import common as rpc_common
34
 
from nova.rpc import dispatcher as rpc_dispatcher
35
 
from nova import test
36
 
 
37
 
 
38
 
FLAGS = flags.FLAGS
39
 
LOG = logging.getLogger(__name__)
40
 
 
41
 
 
42
 
class BaseRpcTestCase(test.TestCase):
43
 
    def setUp(self, supports_timeouts=True, topic='test',
44
 
              topic_nested='nested'):
45
 
        super(BaseRpcTestCase, self).setUp()
46
 
        self.topic = topic or self.topic
47
 
        self.topic_nested = topic_nested or self.topic_nested
48
 
        self.supports_timeouts = supports_timeouts
49
 
        self.context = context.get_admin_context()
50
 
 
51
 
        if self.rpc:
52
 
            receiver = TestReceiver()
53
 
            self.conn = self._create_consumer(receiver, self.topic)
54
 
 
55
 
    def tearDown(self):
56
 
        if self.rpc:
57
 
            self.conn.close()
58
 
        super(BaseRpcTestCase, self).tearDown()
59
 
 
60
 
    def _create_consumer(self, proxy, topic, fanout=False):
61
 
        dispatcher = rpc_dispatcher.RpcDispatcher([proxy])
62
 
        conn = self.rpc.create_connection(FLAGS, True)
63
 
        conn.create_consumer(topic, dispatcher, fanout)
64
 
        conn.consume_in_thread()
65
 
        return conn
66
 
 
67
 
    def test_call_succeed(self):
68
 
        if not self.rpc:
69
 
            raise nose.SkipTest('rpc driver not available.')
70
 
 
71
 
        value = 42
72
 
        result = self.rpc.call(FLAGS, self.context, self.topic,
73
 
                               {"method": "echo", "args": {"value": value}})
74
 
        self.assertEqual(value, result)
75
 
 
76
 
    def test_call_succeed_despite_multiple_returns_yield(self):
77
 
        if not self.rpc:
78
 
            raise nose.SkipTest('rpc driver not available.')
79
 
 
80
 
        value = 42
81
 
        result = self.rpc.call(FLAGS, self.context, self.topic,
82
 
                          {"method": "echo_three_times_yield",
83
 
                           "args": {"value": value}})
84
 
        self.assertEqual(value + 2, result)
85
 
 
86
 
    def test_multicall_succeed_once(self):
87
 
        if not self.rpc:
88
 
            raise nose.SkipTest('rpc driver not available.')
89
 
 
90
 
        value = 42
91
 
        result = self.rpc.multicall(FLAGS, self.context,
92
 
                              self.topic,
93
 
                              {"method": "echo",
94
 
                               "args": {"value": value}})
95
 
        for i, x in enumerate(result):
96
 
            if i > 0:
97
 
                self.fail('should only receive one response')
98
 
            self.assertEqual(value + i, x)
99
 
 
100
 
    def test_multicall_three_nones(self):
101
 
        if not self.rpc:
102
 
            raise nose.SkipTest('rpc driver not available.')
103
 
 
104
 
        value = 42
105
 
        result = self.rpc.multicall(FLAGS, self.context,
106
 
                              self.topic,
107
 
                              {"method": "multicall_three_nones",
108
 
                               "args": {"value": value}})
109
 
        for i, x in enumerate(result):
110
 
            self.assertEqual(x, None)
111
 
        # i should have been 0, 1, and finally 2:
112
 
        self.assertEqual(i, 2)
113
 
 
114
 
    def test_multicall_succeed_three_times_yield(self):
115
 
        if not self.rpc:
116
 
            raise nose.SkipTest('rpc driver not available.')
117
 
 
118
 
        value = 42
119
 
        result = self.rpc.multicall(FLAGS, self.context,
120
 
                              self.topic,
121
 
                              {"method": "echo_three_times_yield",
122
 
                               "args": {"value": value}})
123
 
        for i, x in enumerate(result):
124
 
            self.assertEqual(value + i, x)
125
 
 
126
 
    def test_context_passed(self):
127
 
        if not self.rpc:
128
 
            raise nose.SkipTest('rpc driver not available.')
129
 
 
130
 
        """Makes sure a context is passed through rpc call."""
131
 
        value = 42
132
 
        result = self.rpc.call(FLAGS, self.context,
133
 
                          self.topic, {"method": "context",
134
 
                                   "args": {"value": value}})
135
 
        self.assertEqual(self.context.to_dict(), result)
136
 
 
137
 
    def _test_cast(self, fanout=False):
138
 
        """Test casts by pushing items through a channeled queue."""
139
 
 
140
 
        # Not a true global, but capitalized so
141
 
        # it is clear it is leaking scope into Nested()
142
 
        QUEUE = eventlet.queue.Queue()
143
 
 
144
 
        if not self.rpc:
145
 
            raise nose.SkipTest('rpc driver not available.')
146
 
 
147
 
        # We use the nested topic so we don't need QUEUE to be a proper
148
 
        # global, and do not keep state outside this test.
149
 
        class Nested(object):
150
 
            @staticmethod
151
 
            def put_queue(context, value):
152
 
                LOG.debug("Got value in put_queue: %s", value)
153
 
                QUEUE.put(value)
154
 
 
155
 
        nested = Nested()
156
 
        conn = self._create_consumer(nested, self.topic_nested, fanout)
157
 
        value = 42
158
 
 
159
 
        method = (self.rpc.cast, self.rpc.fanout_cast)[fanout]
160
 
        method(FLAGS, self.context,
161
 
               self.topic_nested,
162
 
               {"method": "put_queue",
163
 
                "args": {"value": value}})
164
 
 
165
 
        try:
166
 
            # If it does not succeed in 2 seconds, give up and assume
167
 
            # failure.
168
 
            result = QUEUE.get(True, 2)
169
 
        except Exception:
170
 
            self.assertEqual(value, None)
171
 
 
172
 
        conn.close()
173
 
        self.assertEqual(value, result)
174
 
 
175
 
    def test_cast_success(self):
176
 
        self._test_cast(False)
177
 
 
178
 
    def test_fanout_success(self):
179
 
        self._test_cast(True)
180
 
 
181
 
    def test_nested_calls(self):
182
 
        if not self.rpc:
183
 
            raise nose.SkipTest('rpc driver not available.')
184
 
 
185
 
        """Test that we can do an rpc.call inside another call."""
186
 
        class Nested(object):
187
 
            @staticmethod
188
 
            def echo(context, queue, value):
189
 
                """Calls echo in the passed queue."""
190
 
                LOG.debug(_("Nested received %(queue)s, %(value)s")
191
 
                        % locals())
192
 
                # TODO(comstud):
193
 
                # so, it will replay the context and use the same REQID?
194
 
                # that's bizarre.
195
 
                ret = self.rpc.call(FLAGS, context,
196
 
                               queue,
197
 
                               {"method": "echo",
198
 
                                "args": {"value": value}})
199
 
                LOG.debug(_("Nested return %s"), ret)
200
 
                return value
201
 
 
202
 
        nested = Nested()
203
 
        conn = self._create_consumer(nested, self.topic_nested)
204
 
 
205
 
        value = 42
206
 
        result = self.rpc.call(FLAGS, self.context,
207
 
                               self.topic_nested,
208
 
                               {"method": "echo",
209
 
                                "args": {"queue": "test", "value": value}})
210
 
        conn.close()
211
 
        self.assertEqual(value, result)
212
 
 
213
 
    def test_call_timeout(self):
214
 
        if not self.rpc:
215
 
            raise nose.SkipTest('rpc driver not available.')
216
 
 
217
 
        """Make sure rpc.call will time out."""
218
 
        if not self.supports_timeouts:
219
 
            raise nose.SkipTest(_("RPC backend does not support timeouts"))
220
 
 
221
 
        value = 42
222
 
        self.assertRaises(rpc_common.Timeout,
223
 
                          self.rpc.call,
224
 
                          FLAGS, self.context,
225
 
                          self.topic,
226
 
                          {"method": "block",
227
 
                           "args": {"value": value}}, timeout=1)
228
 
        try:
229
 
            self.rpc.call(FLAGS, self.context,
230
 
                     self.topic,
231
 
                     {"method": "block",
232
 
                      "args": {"value": value}},
233
 
                     timeout=1)
234
 
            self.fail("should have thrown Timeout")
235
 
        except rpc_common.Timeout as exc:
236
 
            pass
237
 
 
238
 
 
239
 
class BaseRpcAMQPTestCase(BaseRpcTestCase):
240
 
    """Base test class for all AMQP-based RPC tests."""
241
 
    def test_proxycallback_handles_exceptions(self):
242
 
        """Make sure exceptions unpacking messages don't cause hangs."""
243
 
        if not self.rpc:
244
 
            raise nose.SkipTest('rpc driver not available.')
245
 
 
246
 
        orig_unpack = rpc_amqp.unpack_context
247
 
 
248
 
        info = {'unpacked': False}
249
 
 
250
 
        def fake_unpack_context(*args, **kwargs):
251
 
            info['unpacked'] = True
252
 
            raise test.TestingException('moo')
253
 
 
254
 
        self.stubs.Set(rpc_amqp, 'unpack_context', fake_unpack_context)
255
 
 
256
 
        value = 41
257
 
        self.rpc.cast(FLAGS, self.context, self.topic,
258
 
                      {"method": "echo", "args": {"value": value}})
259
 
 
260
 
        # Wait for the cast to complete.
261
 
        for x in xrange(50):
262
 
            if info['unpacked']:
263
 
                break
264
 
            greenthread.sleep(0.1)
265
 
        else:
266
 
            self.fail("Timeout waiting for message to be consumed")
267
 
 
268
 
        # Now see if we get a response even though we raised an
269
 
        # exception for the cast above.
270
 
        self.stubs.Set(rpc_amqp, 'unpack_context', orig_unpack)
271
 
 
272
 
        value = 42
273
 
        result = self.rpc.call(FLAGS, self.context, self.topic,
274
 
                {"method": "echo",
275
 
                 "args": {"value": value}})
276
 
        self.assertEqual(value, result)
277
 
 
278
 
 
279
 
class TestReceiver(object):
280
 
    """Simple Proxy class so the consumer has methods to call.
281
 
 
282
 
    Uses static methods because we aren't actually storing any state.
283
 
 
284
 
    """
285
 
    @staticmethod
286
 
    def echo(context, value):
287
 
        """Simply returns whatever value is sent in."""
288
 
        LOG.debug(_("Received %s"), value)
289
 
        return value
290
 
 
291
 
    @staticmethod
292
 
    def context(context, value):
293
 
        """Returns dictionary version of context."""
294
 
        LOG.debug(_("Received %s"), context)
295
 
        return context.to_dict()
296
 
 
297
 
    @staticmethod
298
 
    def multicall_three_nones(context, value):
299
 
        yield None
300
 
        yield None
301
 
        yield None
302
 
 
303
 
    @staticmethod
304
 
    def echo_three_times_yield(context, value):
305
 
        yield value
306
 
        yield value + 1
307
 
        yield value + 2
308
 
 
309
 
    @staticmethod
310
 
    def fail(context, value):
311
 
        """Raises an exception with the value sent in."""
312
 
        raise NotImplementedError(value)
313
 
 
314
 
    @staticmethod
315
 
    def fail_converted(context, value):
316
 
        """Raises an exception with the value sent in."""
317
 
        raise exception.ConvertedException(explanation=value)
318
 
 
319
 
    @staticmethod
320
 
    def block(context, value):
321
 
        time.sleep(2)