~danwent/nova/qmanager-new

« back to all changes in this revision

Viewing changes to nova/tests/test_rpc_kombu.py

  • Committer: Dan Wendlandt
  • Date: 2011-09-07 21:27:06 UTC
  • mfrom: (1476.2.54 nova)
  • Revision ID: dan@nicira.com-20110907212706-z0fm75250w0gqg5i
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2010 United States Government as represented by the
 
4
# Administrator of the National Aeronautics and Space Administration.
 
5
# All Rights Reserved.
 
6
#
 
7
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
8
#    not use this file except in compliance with the License. You may obtain
 
9
#    a copy of the License at
 
10
#
 
11
#         http://www.apache.org/licenses/LICENSE-2.0
 
12
#
 
13
#    Unless required by applicable law or agreed to in writing, software
 
14
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
15
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
16
#    License for the specific language governing permissions and limitations
 
17
#    under the License.
 
18
"""
 
19
Unit Tests for remote procedure calls using kombu
 
20
"""
 
21
 
 
22
from nova import context
 
23
from nova import log as logging
 
24
from nova import test
 
25
from nova.rpc import impl_kombu
 
26
from nova.tests import test_rpc_common
 
27
 
 
28
 
 
29
LOG = logging.getLogger('nova.tests.rpc')
 
30
 
 
31
 
 
32
class RpcKombuTestCase(test_rpc_common._BaseRpcTestCase):
 
33
    def setUp(self):
 
34
        self.rpc = impl_kombu
 
35
        super(RpcKombuTestCase, self).setUp()
 
36
 
 
37
    def tearDown(self):
 
38
        super(RpcKombuTestCase, self).tearDown()
 
39
 
 
40
    def test_reusing_connection(self):
 
41
        """Test that reusing a connection returns same one."""
 
42
        conn_context = self.rpc.create_connection(new=False)
 
43
        conn1 = conn_context.connection
 
44
        conn_context.close()
 
45
        conn_context = self.rpc.create_connection(new=False)
 
46
        conn2 = conn_context.connection
 
47
        conn_context.close()
 
48
        self.assertEqual(conn1, conn2)
 
49
 
 
50
    def test_topic_send_receive(self):
 
51
        """Test sending to a topic exchange/queue"""
 
52
 
 
53
        conn = self.rpc.create_connection()
 
54
        message = 'topic test message'
 
55
 
 
56
        self.received_message = None
 
57
 
 
58
        def _callback(message):
 
59
            self.received_message = message
 
60
 
 
61
        conn.declare_topic_consumer('a_topic', _callback)
 
62
        conn.topic_send('a_topic', message)
 
63
        conn.consume(limit=1)
 
64
        conn.close()
 
65
 
 
66
        self.assertEqual(self.received_message, message)
 
67
 
 
68
    def test_direct_send_receive(self):
 
69
        """Test sending to a direct exchange/queue"""
 
70
        conn = self.rpc.create_connection()
 
71
        message = 'direct test message'
 
72
 
 
73
        self.received_message = None
 
74
 
 
75
        def _callback(message):
 
76
            self.received_message = message
 
77
 
 
78
        conn.declare_direct_consumer('a_direct', _callback)
 
79
        conn.direct_send('a_direct', message)
 
80
        conn.consume(limit=1)
 
81
        conn.close()
 
82
 
 
83
        self.assertEqual(self.received_message, message)
 
84
 
 
85
    @test.skip_test("kombu memory transport seems buggy with fanout queues "
 
86
            "as this test passes when you use rabbit (fake_rabbit=False)")
 
87
    def test_fanout_send_receive(self):
 
88
        """Test sending to a fanout exchange and consuming from 2 queues"""
 
89
 
 
90
        conn = self.rpc.create_connection()
 
91
        conn2 = self.rpc.create_connection()
 
92
        message = 'fanout test message'
 
93
 
 
94
        self.received_message = None
 
95
 
 
96
        def _callback(message):
 
97
            self.received_message = message
 
98
 
 
99
        conn.declare_fanout_consumer('a_fanout', _callback)
 
100
        conn2.declare_fanout_consumer('a_fanout', _callback)
 
101
        conn.fanout_send('a_fanout', message)
 
102
 
 
103
        conn.consume(limit=1)
 
104
        conn.close()
 
105
        self.assertEqual(self.received_message, message)
 
106
 
 
107
        self.received_message = None
 
108
        conn2.consume(limit=1)
 
109
        conn2.close()
 
110
        self.assertEqual(self.received_message, message)