1
# Copyright (c) 2014 VMware, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
17
from neutron.common import exceptions
18
from neutron.plugins.vmware import nsxlib
19
from neutron.plugins.vmware.nsxlib import secgroup as secgrouplib
20
from neutron.tests.unit import test_api_v2
21
from neutron.tests.unit.vmware.nsxlib import base
23
_uuid = test_api_v2._uuid
26
class SecurityProfileTestCase(base.NsxlibTestCase):
28
def test_create_and_get_security_profile(self):
29
sec_prof = secgrouplib.create_security_profile(
30
self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
31
sec_prof_res = nsxlib.do_request(
33
nsxlib._build_uri_path('security-profile',
34
resource_id=sec_prof['uuid']),
35
cluster=self.fake_cluster)
36
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
37
# Check for builtin rules
38
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1)
39
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
41
def test_create_and_get_default_security_profile(self):
42
sec_prof = secgrouplib.create_security_profile(
43
self.fake_cluster, _uuid(), 'pippo', {'name': 'default'})
44
sec_prof_res = nsxlib.do_request(
46
nsxlib._build_uri_path('security-profile',
47
resource_id=sec_prof['uuid']),
48
cluster=self.fake_cluster)
49
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
50
# Check for builtin rules
51
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3)
52
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
54
def test_update_security_profile_raise_not_found(self):
55
self.assertRaises(exceptions.NotFound,
56
secgrouplib.update_security_profile,
58
_uuid(), 'tatore_magno(the great)')
60
def test_update_security_profile(self):
61
tenant_id = 'foo_tenant_uuid'
62
secgroup_id = 'foo_secgroup_uuid'
63
old_sec_prof = secgrouplib.create_security_profile(
64
self.fake_cluster, tenant_id, secgroup_id,
65
{'name': 'tatore_magno'})
66
new_sec_prof = secgrouplib.update_security_profile(
67
self.fake_cluster, old_sec_prof['uuid'], 'aaron_magno')
68
self.assertEqual('aaron_magno', new_sec_prof['display_name'])
70
def test_update_security_profile_rules(self):
71
sec_prof = secgrouplib.create_security_profile(
72
self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
73
ingress_rule = {'ethertype': 'IPv4'}
74
egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
75
new_rules = {'logical_port_egress_rules': [egress_rule],
76
'logical_port_ingress_rules': [ingress_rule]}
77
secgrouplib.update_security_group_rules(
78
self.fake_cluster, sec_prof['uuid'], new_rules)
79
sec_prof_res = nsxlib.do_request(
81
nsxlib._build_uri_path('security-profile',
82
resource_id=sec_prof['uuid']),
83
cluster=self.fake_cluster)
84
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
85
# Check for builtin rules
86
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
87
self.assertIn(egress_rule,
88
sec_prof_res['logical_port_egress_rules'])
89
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
90
self.assertIn(ingress_rule,
91
sec_prof_res['logical_port_ingress_rules'])
93
def test_update_security_profile_rules_noingress(self):
94
sec_prof = secgrouplib.create_security_profile(
95
self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
96
hidden_ingress_rule = {'ethertype': 'IPv4',
97
'ip_prefix': '127.0.0.1/32'}
98
egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
99
new_rules = {'logical_port_egress_rules': [egress_rule],
100
'logical_port_ingress_rules': []}
101
secgrouplib.update_security_group_rules(
102
self.fake_cluster, sec_prof['uuid'], new_rules)
103
sec_prof_res = nsxlib.do_request(
105
nsxlib._build_uri_path('security-profile',
106
resource_id=sec_prof['uuid']),
107
cluster=self.fake_cluster)
108
self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
109
# Check for builtin rules
110
self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
111
self.assertIn(egress_rule,
112
sec_prof_res['logical_port_egress_rules'])
113
self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
114
self.assertIn(hidden_ingress_rule,
115
sec_prof_res['logical_port_ingress_rules'])
117
def test_update_non_existing_securityprofile_raises(self):
118
self.assertRaises(exceptions.NeutronException,
119
secgrouplib.update_security_group_rules,
120
self.fake_cluster, 'whatever',
121
{'logical_port_egress_rules': [],
122
'logical_port_ingress_rules': []})
124
def test_delete_security_profile(self):
125
sec_prof = secgrouplib.create_security_profile(
126
self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
127
secgrouplib.delete_security_profile(
128
self.fake_cluster, sec_prof['uuid'])
129
self.assertRaises(exceptions.NotFound,
132
nsxlib._build_uri_path(
134
resource_id=sec_prof['uuid']),
135
cluster=self.fake_cluster)
137
def test_delete_non_existing_securityprofile_raises(self):
138
self.assertRaises(exceptions.NeutronException,
139
secgrouplib.delete_security_profile,
140
self.fake_cluster, 'whatever')