~ubuntu-branches/ubuntu/saucy/nova/saucy-proposed

« back to all changes in this revision

Viewing changes to nova/tests/rpc/test_kombu.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-05-24 13:12:53 UTC
  • mfrom: (1.1.55)
  • Revision ID: package-import@ubuntu.com-20120524131253-ommql08fg1en06ut
Tags: 2012.2~f1-0ubuntu1
* New upstream release.
* Prepare for quantal:
  - Dropped debian/patches/upstream/0006-Use-project_id-in-ec2.cloud._format_image.patch
  - Dropped debian/patches/upstream/0005-Populate-image-properties-with-project_id-again.patch
  - Dropped debian/patches/upstream/0004-Fixed-bug-962840-added-a-test-case.patch
  - Dropped debian/patches/upstream/0003-Allow-unprivileged-RADOS-users-to-access-rbd-volumes.patch
  - Dropped debian/patches/upstream/0002-Stop-libvirt-test-from-deleting-instances-dir.patch
  - Dropped debian/patches/upstream/0001-fix-bug-where-nova-ignores-glance-host-in-imageref.patch 
  - Dropped debian/patches/0001-fix-useexisting-deprecation-warnings.patch
* debian/control: Add python-keystone as a dependency. (LP: #907197)
* debian/patches/kombu_tests_timeout.patch: Refreshed.
* debian/nova.conf, debian/nova-common.postinst: Convert to new ini
  file configuration
* debian/patches/nova-manage_flagfile_location.patch: Refreshed

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
Unit Tests for remote procedure calls using kombu
20
20
"""
21
21
 
 
22
from nose import SkipTest
22
23
from nova import context
 
24
from nova import exception
23
25
from nova import flags
24
26
from nova import log as logging
 
27
from nova.rpc import amqp as rpc_amqp
25
28
from nova import test
26
 
from nova.rpc import amqp as rpc_amqp
27
 
from nova.rpc import impl_kombu
28
29
from nova.tests.rpc import common
29
30
 
 
31
try:
 
32
    import kombu
 
33
    from nova.rpc import impl_kombu
 
34
except ImportError:
 
35
    kombu = None
 
36
    impl_kombu = None
 
37
 
 
38
 
30
39
FLAGS = flags.FLAGS
31
40
LOG = logging.getLogger(__name__)
32
41
 
51
60
 
52
61
class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
53
62
    def setUp(self):
54
 
        self.rpc = impl_kombu
 
63
        if kombu:
 
64
            self.rpc = impl_kombu
 
65
            impl_kombu.register_opts(FLAGS)
 
66
        else:
 
67
            self.rpc = None
55
68
        super(RpcKombuTestCase, self).setUp()
56
69
 
 
70
        # Skip tests if we are running in an ubuntu buildd
 
71
        try:
 
72
            fh = open('/dev/log')
 
73
        except IOError as e:
 
74
            raise SkipTest
 
75
 
57
76
    def tearDown(self):
58
 
        impl_kombu.cleanup()
 
77
        if kombu:
 
78
            impl_kombu.cleanup()
59
79
        super(RpcKombuTestCase, self).tearDown()
60
80
 
 
81
    @test.skip_if(kombu is None, "Test requires kombu")
61
82
    def test_reusing_connection(self):
62
83
        """Test that reusing a connection returns same one."""
63
 
        conn_context = self.rpc.create_connection(new=False)
 
84
        conn_context = self.rpc.create_connection(FLAGS, new=False)
64
85
        conn1 = conn_context.connection
65
86
        conn_context.close()
66
 
        conn_context = self.rpc.create_connection(new=False)
 
87
        conn_context = self.rpc.create_connection(FLAGS, new=False)
67
88
        conn2 = conn_context.connection
68
89
        conn_context.close()
69
90
        self.assertEqual(conn1, conn2)
70
91
 
 
92
    @test.skip_if(kombu is None, "Test requires kombu")
71
93
    def test_topic_send_receive(self):
72
94
        """Test sending to a topic exchange/queue"""
73
95
 
74
 
        conn = self.rpc.create_connection()
 
96
        conn = self.rpc.create_connection(FLAGS)
75
97
        message = 'topic test message'
76
98
 
77
99
        self.received_message = None
86
108
 
87
109
        self.assertEqual(self.received_message, message)
88
110
 
 
111
    @test.skip_if(kombu is None, "Test requires kombu")
89
112
    def test_direct_send_receive(self):
90
113
        """Test sending to a direct exchange/queue"""
91
 
        conn = self.rpc.create_connection()
 
114
        conn = self.rpc.create_connection(FLAGS)
92
115
        message = 'direct test message'
93
116
 
94
117
        self.received_message = None
103
126
 
104
127
        self.assertEqual(self.received_message, message)
105
128
 
 
129
    @test.skip_if(kombu is None, "Test requires kombu")
106
130
    def test_cast_interface_uses_default_options(self):
107
131
        """Test kombu rpc.cast"""
108
132
 
122
146
            def topic_send(_context, topic, msg):
123
147
                pass
124
148
 
125
 
        MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
 
149
        MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
126
150
        self.stubs.Set(impl_kombu, 'Connection', MyConnection)
127
151
 
128
 
        impl_kombu.cast(ctxt, 'fake_topic', {'msg': 'fake'})
 
152
        impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'})
129
153
 
 
154
    @test.skip_if(kombu is None, "Test requires kombu")
130
155
    def test_cast_to_server_uses_server_params(self):
131
156
        """Test kombu rpc.cast"""
132
157
 
152
177
            def topic_send(_context, topic, msg):
153
178
                pass
154
179
 
155
 
        MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
 
180
        MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
156
181
        self.stubs.Set(impl_kombu, 'Connection', MyConnection)
157
182
 
158
 
        impl_kombu.cast_to_server(ctxt, server_params,
 
183
        impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
159
184
                'fake_topic', {'msg': 'fake'})
160
185
 
161
186
    @test.skip_test("kombu memory transport seems buggy with fanout queues "
185
210
        conn2.close()
186
211
        self.assertEqual(self.received_message, message)
187
212
 
 
213
    @test.skip_if(kombu is None, "Test requires kombu")
188
214
    def test_declare_consumer_errors_will_reconnect(self):
189
215
        # Test that any exception with 'timeout' in it causes a
190
216
        # reconnection
191
217
        info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
192
218
                '__init__', 'foo timeout foo')
193
219
 
194
 
        conn = self.rpc.Connection()
 
220
        conn = self.rpc.Connection(FLAGS)
195
221
        result = conn.declare_consumer(self.rpc.DirectConsumer,
196
222
                'test_topic', None)
197
223
 
205
231
        info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
206
232
                '__init__', 'meow')
207
233
 
208
 
        conn = self.rpc.Connection()
 
234
        conn = self.rpc.Connection(FLAGS)
209
235
        conn.connection_errors = (MyException, )
210
236
 
211
237
        result = conn.declare_consumer(self.rpc.DirectConsumer,
214
240
        self.assertEqual(info['called'], 2)
215
241
        self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
216
242
 
 
243
    @test.skip_if(kombu is None, "Test requires kombu")
217
244
    def test_declare_consumer_ioerrors_will_reconnect(self):
218
245
        """Test that an IOError exception causes a reconnection"""
219
246
        info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
220
247
                '__init__', 'Socket closed', exc_class=IOError)
221
248
 
222
 
        conn = self.rpc.Connection()
 
249
        conn = self.rpc.Connection(FLAGS)
223
250
        result = conn.declare_consumer(self.rpc.DirectConsumer,
224
251
                'test_topic', None)
225
252
 
226
253
        self.assertEqual(info['called'], 3)
227
254
        self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
228
255
 
 
256
    @test.skip_if(kombu is None, "Test requires kombu")
229
257
    def test_publishing_errors_will_reconnect(self):
230
258
        # Test that any exception with 'timeout' in it causes a
231
259
        # reconnection when declaring the publisher class and when
233
261
        info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
234
262
                '__init__', 'foo timeout foo')
235
263
 
236
 
        conn = self.rpc.Connection()
 
264
        conn = self.rpc.Connection(FLAGS)
237
265
        conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
238
266
 
239
267
        self.assertEqual(info['called'], 3)
242
270
        info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
243
271
                'send', 'foo timeout foo')
244
272
 
245
 
        conn = self.rpc.Connection()
 
273
        conn = self.rpc.Connection(FLAGS)
246
274
        conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
247
275
 
248
276
        self.assertEqual(info['called'], 3)
255
283
        info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
256
284
                '__init__', 'meow')
257
285
 
258
 
        conn = self.rpc.Connection()
 
286
        conn = self.rpc.Connection(FLAGS)
259
287
        conn.connection_errors = (MyException, )
260
288
 
261
289
        conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
266
294
        info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
267
295
                'send', 'meow')
268
296
 
269
 
        conn = self.rpc.Connection()
 
297
        conn = self.rpc.Connection(FLAGS)
270
298
        conn.connection_errors = (MyException, )
271
299
 
272
300
        conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
273
301
 
274
302
        self.assertEqual(info['called'], 2)
275
303
 
276
 
    @test.skip_test("This tests does not timeout on Ubuntu")
 
304
    @test.skip_if(kombu is None, "Test requires kombu")
277
305
    def test_iterconsume_errors_will_reconnect(self):
278
 
        conn = self.rpc.Connection()
 
306
        conn = self.rpc.Connection(FLAGS)
279
307
        message = 'reconnect test message'
280
308
 
281
309
        self.received_message = None
293
321
 
294
322
        self.assertEqual(self.received_message, message)
295
323
        # Only called once, because our stub goes away during reconnection
296
 
        self.assertEqual(info['called'], 1)
 
324
 
 
325
    @test.skip_if(kombu is None, "Test requires kombu")
 
326
    def test_call_exception(self):
 
327
        """Test that exception gets passed back properly.
 
328
 
 
329
        rpc.call returns an Exception object.  The value of the
 
330
        exception is converted to a string.
 
331
 
 
332
        """
 
333
        self.flags(allowed_rpc_exception_modules=['exceptions'])
 
334
        value = "This is the exception message"
 
335
        self.assertRaises(NotImplementedError,
 
336
                          self.rpc.call,
 
337
                          FLAGS,
 
338
                          self.context,
 
339
                          'test',
 
340
                          {"method": "fail",
 
341
                           "args": {"value": value}})
 
342
        try:
 
343
            self.rpc.call(FLAGS, self.context,
 
344
                     'test',
 
345
                     {"method": "fail",
 
346
                      "args": {"value": value}})
 
347
            self.fail("should have thrown Exception")
 
348
        except NotImplementedError as exc:
 
349
            self.assertTrue(value in unicode(exc))
 
350
            #Traceback should be included in exception message
 
351
            self.assertTrue('raise NotImplementedError(value)' in unicode(exc))
 
352
 
 
353
    @test.skip_if(kombu is None, "Test requires kombu")
 
354
    def test_call_converted_exception(self):
 
355
        """Test that exception gets passed back properly.
 
356
 
 
357
        rpc.call returns an Exception object.  The value of the
 
358
        exception is converted to a string.
 
359
 
 
360
        """
 
361
        value = "This is the exception message"
 
362
        self.assertRaises(exception.ConvertedException,
 
363
                          self.rpc.call,
 
364
                          FLAGS,
 
365
                          self.context,
 
366
                          'test',
 
367
                          {"method": "fail_converted",
 
368
                           "args": {"value": value}})
 
369
        try:
 
370
            self.rpc.call(FLAGS, self.context,
 
371
                     'test',
 
372
                     {"method": "fail_converted",
 
373
                      "args": {"value": value}})
 
374
            self.fail("should have thrown Exception")
 
375
        except exception.ConvertedException as exc:
 
376
            self.assertTrue(value in unicode(exc))
 
377
            #Traceback should be included in exception message
 
378
            self.assertTrue('exception.ConvertedException' in unicode(exc))