~ubuntu-branches/ubuntu/vivid/neutron/vivid-updates

« back to all changes in this revision

Viewing changes to neutron/tests/unit/hyperv/test_hyperv_rpcapi.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-03-30 11:17:19 UTC
  • mfrom: (1.1.21)
  • Revision ID: package-import@ubuntu.com-20150330111719-h0gx7233p4jkkgfh
Tags: 1:2015.1~b3-0ubuntu1
* New upstream milestone release:
  - d/control: Align version requirements with upstream.
  - d/control: Add new dependency on oslo-log.
  - d/p/*: Rebase.
  - d/control,d/neutron-plugin-hyperv*: Dropped, decomposed into
    separate project upstream.
  - d/control,d/neutron-plugin-openflow*: Dropped, decomposed into
    separate project upstream.
  - d/neutron-common.install: Add neutron-rootwrap-daemon and 
    neutron-keepalived-state-change binaries.
  - d/rules: Ignore neutron-hyperv-agent when installing; only for Windows.
  - d/neutron-plugin-cisco.install: Drop neutron-cisco-cfg-agent as
    decomposed into separate project upstream.
  - d/neutron-plugin-vmware.install: Drop neutron-check-nsx-config and
    neutron-nsx-manage as decomposed into separate project upstream.
  - d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent.
* d/pydist-overrides: Add overrides for oslo packages.
* d/control: Fixup type in package description (LP: #1263539).
* d/p/fixup-driver-test-execution.patch: Cherry pick fix from upstream VCS
  to support unit test exection in out-of-tree vendor drivers.
* d/neutron-common.postinst: Allow general access to /etc/neutron but limit
  access to root/neutron to /etc/neutron/neutron.conf to support execution
  of unit tests in decomposed vendor drivers.
* d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent
  package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2013 Cloudbase Solutions SRL
2
 
# Copyright 2013 Pedro Navarro Perez
3
 
# All Rights Reserved.
4
 
#
5
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6
 
#    not use this file except in compliance with the License. You may obtain
7
 
#    a copy of the License at
8
 
#
9
 
#         http://www.apache.org/licenses/LICENSE-2.0
10
 
#
11
 
#    Unless required by applicable law or agreed to in writing, software
12
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
 
#    License for the specific language governing permissions and limitations
15
 
#    under the License.
16
 
 
17
 
"""
18
 
Unit Tests for hyperv neutron rpc
19
 
"""
20
 
 
21
 
import contextlib
22
 
 
23
 
import mock
24
 
from oslo_context import context as oslo_context
25
 
 
26
 
from neutron.agent import rpc as agent_rpc
27
 
from neutron.common import topics
28
 
from neutron.plugins.hyperv import agent_notifier_api as ana
29
 
from neutron.plugins.hyperv.common import constants
30
 
from neutron.tests import base
31
 
 
32
 
 
33
 
class rpcHyperVApiTestCase(base.BaseTestCase):
34
 
 
35
 
    def _test_hyperv_neutron_api(
36
 
            self, rpcapi, topic, method, rpc_method, **kwargs):
37
 
        ctxt = oslo_context.RequestContext('fake_user', 'fake_project')
38
 
        expected_retval = 'foo' if rpc_method == 'call' else None
39
 
        expected_version = kwargs.pop('version', None)
40
 
        fanout = kwargs.pop('fanout', False)
41
 
 
42
 
        with contextlib.nested(
43
 
            mock.patch.object(rpcapi.client, rpc_method),
44
 
            mock.patch.object(rpcapi.client, 'prepare'),
45
 
        ) as (
46
 
            rpc_mock, prepare_mock
47
 
        ):
48
 
            prepare_mock.return_value = rpcapi.client
49
 
            rpc_mock.return_value = expected_retval
50
 
            retval = getattr(rpcapi, method)(ctxt, **kwargs)
51
 
 
52
 
        self.assertEqual(retval, expected_retval)
53
 
 
54
 
        prepare_args = {}
55
 
        if expected_version:
56
 
            prepare_args['version'] = expected_version
57
 
        if fanout:
58
 
            prepare_args['fanout'] = True
59
 
        if topic:
60
 
            prepare_args['topic'] = topic
61
 
        prepare_mock.assert_called_once_with(**prepare_args)
62
 
 
63
 
        rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
64
 
 
65
 
    def test_delete_network(self):
66
 
        rpcapi = ana.AgentNotifierApi(topics.AGENT)
67
 
        self._test_hyperv_neutron_api(
68
 
            rpcapi,
69
 
            topics.get_topic_name(
70
 
                topics.AGENT,
71
 
                topics.NETWORK,
72
 
                topics.DELETE),
73
 
            'network_delete', rpc_method='cast', fanout=True,
74
 
            network_id='fake_request_spec')
75
 
 
76
 
    def test_port_update(self):
77
 
        rpcapi = ana.AgentNotifierApi(topics.AGENT)
78
 
        self._test_hyperv_neutron_api(
79
 
            rpcapi,
80
 
            topics.get_topic_name(
81
 
                topics.AGENT,
82
 
                topics.PORT,
83
 
                topics.UPDATE),
84
 
            'port_update', rpc_method='cast', fanout=True,
85
 
            port='fake_port',
86
 
            network_type='fake_network_type',
87
 
            segmentation_id='fake_segmentation_id',
88
 
            physical_network='fake_physical_network')
89
 
 
90
 
    def test_port_delete(self):
91
 
        rpcapi = ana.AgentNotifierApi(topics.AGENT)
92
 
        self._test_hyperv_neutron_api(
93
 
            rpcapi,
94
 
            topics.get_topic_name(
95
 
                topics.AGENT,
96
 
                topics.PORT,
97
 
                topics.DELETE),
98
 
            'port_delete', rpc_method='cast', fanout=True,
99
 
            port_id='port_id')
100
 
 
101
 
    def test_tunnel_update(self):
102
 
        rpcapi = ana.AgentNotifierApi(topics.AGENT)
103
 
        self._test_hyperv_neutron_api(
104
 
            rpcapi,
105
 
            topics.get_topic_name(
106
 
                topics.AGENT,
107
 
                constants.TUNNEL,
108
 
                topics.UPDATE),
109
 
            'tunnel_update', rpc_method='cast', fanout=True,
110
 
            tunnel_ip='fake_ip', tunnel_id='fake_id')
111
 
 
112
 
    def test_device_details(self):
113
 
        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
114
 
        self._test_hyperv_neutron_api(
115
 
            rpcapi, None,
116
 
            'get_device_details', rpc_method='call',
117
 
            device='fake_device',
118
 
            agent_id='fake_agent_id',
119
 
            host='fake_host')
120
 
 
121
 
    def test_devices_details_list(self):
122
 
        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
123
 
        self._test_hyperv_neutron_api(
124
 
            rpcapi, None,
125
 
            'get_devices_details_list', rpc_method='call',
126
 
            devices=['fake_device1', 'fake_device2'],
127
 
            agent_id='fake_agent_id', host='fake_host',
128
 
            version='1.3')
129
 
 
130
 
    def test_update_device_down(self):
131
 
        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
132
 
        self._test_hyperv_neutron_api(
133
 
            rpcapi, None,
134
 
            'update_device_down', rpc_method='call',
135
 
            device='fake_device',
136
 
            agent_id='fake_agent_id',
137
 
            host='fake_host')
138
 
 
139
 
    def test_tunnel_sync(self):
140
 
        rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
141
 
        self._test_hyperv_neutron_api(
142
 
            rpcapi, None,
143
 
            'tunnel_sync', rpc_method='call',
144
 
            tunnel_ip='fake_tunnel_ip',
145
 
            tunnel_type=None)