19
19
Unit Tests for remote procedure calls using kombu
22
from nose import SkipTest
22
23
from nova import context
24
from nova import exception
23
25
from nova import flags
24
26
from nova import log as logging
27
from nova.rpc import amqp as rpc_amqp
25
28
from nova import test
26
from nova.rpc import amqp as rpc_amqp
27
from nova.rpc import impl_kombu
28
29
from nova.tests.rpc import common
33
from nova.rpc import impl_kombu
30
39
FLAGS = flags.FLAGS
31
40
LOG = logging.getLogger(__name__)
52
61
class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
65
impl_kombu.register_opts(FLAGS)
55
68
super(RpcKombuTestCase, self).setUp()
70
# Skip tests if we are running in an ubuntu buildd
57
76
def tearDown(self):
59
79
super(RpcKombuTestCase, self).tearDown()
81
@test.skip_if(kombu is None, "Test requires kombu")
61
82
def test_reusing_connection(self):
62
83
"""Test that reusing a connection returns same one."""
63
conn_context = self.rpc.create_connection(new=False)
84
conn_context = self.rpc.create_connection(FLAGS, new=False)
64
85
conn1 = conn_context.connection
65
86
conn_context.close()
66
conn_context = self.rpc.create_connection(new=False)
87
conn_context = self.rpc.create_connection(FLAGS, new=False)
67
88
conn2 = conn_context.connection
68
89
conn_context.close()
69
90
self.assertEqual(conn1, conn2)
92
@test.skip_if(kombu is None, "Test requires kombu")
71
93
def test_topic_send_receive(self):
72
94
"""Test sending to a topic exchange/queue"""
74
conn = self.rpc.create_connection()
96
conn = self.rpc.create_connection(FLAGS)
75
97
message = 'topic test message'
77
99
self.received_message = None
87
109
self.assertEqual(self.received_message, message)
111
@test.skip_if(kombu is None, "Test requires kombu")
89
112
def test_direct_send_receive(self):
90
113
"""Test sending to a direct exchange/queue"""
91
conn = self.rpc.create_connection()
114
conn = self.rpc.create_connection(FLAGS)
92
115
message = 'direct test message'
94
117
self.received_message = None
122
146
def topic_send(_context, topic, msg):
125
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
149
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
126
150
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
128
impl_kombu.cast(ctxt, 'fake_topic', {'msg': 'fake'})
152
impl_kombu.cast(FLAGS, ctxt, 'fake_topic', {'msg': 'fake'})
154
@test.skip_if(kombu is None, "Test requires kombu")
130
155
def test_cast_to_server_uses_server_params(self):
131
156
"""Test kombu rpc.cast"""
152
177
def topic_send(_context, topic, msg):
155
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
180
MyConnection.pool = rpc_amqp.Pool(FLAGS, MyConnection)
156
181
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
158
impl_kombu.cast_to_server(ctxt, server_params,
183
impl_kombu.cast_to_server(FLAGS, ctxt, server_params,
159
184
'fake_topic', {'msg': 'fake'})
161
186
@test.skip_test("kombu memory transport seems buggy with fanout queues "
186
211
self.assertEqual(self.received_message, message)
213
@test.skip_if(kombu is None, "Test requires kombu")
188
214
def test_declare_consumer_errors_will_reconnect(self):
189
215
# Test that any exception with 'timeout' in it causes a
191
217
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
192
218
'__init__', 'foo timeout foo')
194
conn = self.rpc.Connection()
220
conn = self.rpc.Connection(FLAGS)
195
221
result = conn.declare_consumer(self.rpc.DirectConsumer,
196
222
'test_topic', None)
205
231
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectConsumer,
206
232
'__init__', 'meow')
208
conn = self.rpc.Connection()
234
conn = self.rpc.Connection(FLAGS)
209
235
conn.connection_errors = (MyException, )
211
237
result = conn.declare_consumer(self.rpc.DirectConsumer,
214
240
self.assertEqual(info['called'], 2)
215
241
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
243
@test.skip_if(kombu is None, "Test requires kombu")
217
244
def test_declare_consumer_ioerrors_will_reconnect(self):
218
245
"""Test that an IOError exception causes a reconnection"""
219
246
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectConsumer,
220
247
'__init__', 'Socket closed', exc_class=IOError)
222
conn = self.rpc.Connection()
249
conn = self.rpc.Connection(FLAGS)
223
250
result = conn.declare_consumer(self.rpc.DirectConsumer,
224
251
'test_topic', None)
226
253
self.assertEqual(info['called'], 3)
227
254
self.assertTrue(isinstance(result, self.rpc.DirectConsumer))
256
@test.skip_if(kombu is None, "Test requires kombu")
229
257
def test_publishing_errors_will_reconnect(self):
230
258
# Test that any exception with 'timeout' in it causes a
231
259
# reconnection when declaring the publisher class and when
233
261
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
234
262
'__init__', 'foo timeout foo')
236
conn = self.rpc.Connection()
264
conn = self.rpc.Connection(FLAGS)
237
265
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
239
267
self.assertEqual(info['called'], 3)
242
270
info = _raise_exc_stub(self.stubs, 2, self.rpc.DirectPublisher,
243
271
'send', 'foo timeout foo')
245
conn = self.rpc.Connection()
273
conn = self.rpc.Connection(FLAGS)
246
274
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
248
276
self.assertEqual(info['called'], 3)
255
283
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
256
284
'__init__', 'meow')
258
conn = self.rpc.Connection()
286
conn = self.rpc.Connection(FLAGS)
259
287
conn.connection_errors = (MyException, )
261
289
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
266
294
info = _raise_exc_stub(self.stubs, 1, self.rpc.DirectPublisher,
269
conn = self.rpc.Connection()
297
conn = self.rpc.Connection(FLAGS)
270
298
conn.connection_errors = (MyException, )
272
300
conn.publisher_send(self.rpc.DirectPublisher, 'test_topic', 'msg')
274
302
self.assertEqual(info['called'], 2)
276
@test.skip_test("This tests does not timeout on Ubuntu")
304
@test.skip_if(kombu is None, "Test requires kombu")
277
305
def test_iterconsume_errors_will_reconnect(self):
278
conn = self.rpc.Connection()
306
conn = self.rpc.Connection(FLAGS)
279
307
message = 'reconnect test message'
281
309
self.received_message = None
294
322
self.assertEqual(self.received_message, message)
295
323
# Only called once, because our stub goes away during reconnection
296
self.assertEqual(info['called'], 1)
325
@test.skip_if(kombu is None, "Test requires kombu")
326
def test_call_exception(self):
327
"""Test that exception gets passed back properly.
329
rpc.call returns an Exception object. The value of the
330
exception is converted to a string.
333
self.flags(allowed_rpc_exception_modules=['exceptions'])
334
value = "This is the exception message"
335
self.assertRaises(NotImplementedError,
341
"args": {"value": value}})
343
self.rpc.call(FLAGS, self.context,
346
"args": {"value": value}})
347
self.fail("should have thrown Exception")
348
except NotImplementedError as exc:
349
self.assertTrue(value in unicode(exc))
350
#Traceback should be included in exception message
351
self.assertTrue('raise NotImplementedError(value)' in unicode(exc))
353
@test.skip_if(kombu is None, "Test requires kombu")
354
def test_call_converted_exception(self):
355
"""Test that exception gets passed back properly.
357
rpc.call returns an Exception object. The value of the
358
exception is converted to a string.
361
value = "This is the exception message"
362
self.assertRaises(exception.ConvertedException,
367
{"method": "fail_converted",
368
"args": {"value": value}})
370
self.rpc.call(FLAGS, self.context,
372
{"method": "fail_converted",
373
"args": {"value": value}})
374
self.fail("should have thrown Exception")
375
except exception.ConvertedException as exc:
376
self.assertTrue(value in unicode(exc))
377
#Traceback should be included in exception message
378
self.assertTrue('exception.ConvertedException' in unicode(exc))