~ubuntu-branches/ubuntu/vivid/neutron/vivid-updates

« back to all changes in this revision

Viewing changes to neutron/tests/unit/vmware/nsxlib/test_secgroup.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-03-30 11:17:19 UTC
  • mfrom: (1.1.21)
  • Revision ID: package-import@ubuntu.com-20150330111719-h0gx7233p4jkkgfh
Tags: 1:2015.1~b3-0ubuntu1
* New upstream milestone release:
  - d/control: Align version requirements with upstream.
  - d/control: Add new dependency on oslo-log.
  - d/p/*: Rebase.
  - d/control,d/neutron-plugin-hyperv*: Dropped, decomposed into
    separate project upstream.
  - d/control,d/neutron-plugin-openflow*: Dropped, decomposed into
    separate project upstream.
  - d/neutron-common.install: Add neutron-rootwrap-daemon and 
    neutron-keepalived-state-change binaries.
  - d/rules: Ignore neutron-hyperv-agent when installing; only for Windows.
  - d/neutron-plugin-cisco.install: Drop neutron-cisco-cfg-agent as
    decomposed into separate project upstream.
  - d/neutron-plugin-vmware.install: Drop neutron-check-nsx-config and
    neutron-nsx-manage as decomposed into separate project upstream.
  - d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent.
* d/pydist-overrides: Add overrides for oslo packages.
* d/control: Fixup type in package description (LP: #1263539).
* d/p/fixup-driver-test-execution.patch: Cherry pick fix from upstream VCS
  to support unit test exection in out-of-tree vendor drivers.
* d/neutron-common.postinst: Allow general access to /etc/neutron but limit
  access to root/neutron to /etc/neutron/neutron.conf to support execution
  of unit tests in decomposed vendor drivers.
* d/control: Add dependency on python-neutron-fwaas to neutron-l3-agent
  package.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2014 VMware, Inc.
2
 
#
3
 
# Licensed under the Apache License, Version 2.0 (the "License");
4
 
# you may not use this file except in compliance with the License.
5
 
# You may obtain a copy of the License at
6
 
#
7
 
#    http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
# Unless required by applicable law or agreed to in writing, software
10
 
# distributed under the License is distributed on an "AS IS" BASIS,
11
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
 
# implied.
13
 
# See the License for the specific language governing permissions and
14
 
# limitations under the License.
15
 
#
16
 
 
17
 
from neutron.common import exceptions
18
 
from neutron.plugins.vmware import nsxlib
19
 
from neutron.plugins.vmware.nsxlib import secgroup as secgrouplib
20
 
from neutron.tests.unit import test_api_v2
21
 
from neutron.tests.unit.vmware.nsxlib import base
22
 
 
23
 
_uuid = test_api_v2._uuid
24
 
 
25
 
 
26
 
class SecurityProfileTestCase(base.NsxlibTestCase):
27
 
 
28
 
    def test_create_and_get_security_profile(self):
29
 
        sec_prof = secgrouplib.create_security_profile(
30
 
            self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
31
 
        sec_prof_res = nsxlib.do_request(
32
 
            secgrouplib.HTTP_GET,
33
 
            nsxlib._build_uri_path('security-profile',
34
 
                                   resource_id=sec_prof['uuid']),
35
 
            cluster=self.fake_cluster)
36
 
        self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
37
 
        # Check for builtin rules
38
 
        self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 1)
39
 
        self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
40
 
 
41
 
    def test_create_and_get_default_security_profile(self):
42
 
        sec_prof = secgrouplib.create_security_profile(
43
 
            self.fake_cluster, _uuid(), 'pippo', {'name': 'default'})
44
 
        sec_prof_res = nsxlib.do_request(
45
 
            secgrouplib.HTTP_GET,
46
 
            nsxlib._build_uri_path('security-profile',
47
 
                                   resource_id=sec_prof['uuid']),
48
 
            cluster=self.fake_cluster)
49
 
        self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
50
 
        # Check for builtin rules
51
 
        self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 3)
52
 
        self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 2)
53
 
 
54
 
    def test_update_security_profile_raise_not_found(self):
55
 
        self.assertRaises(exceptions.NotFound,
56
 
                          secgrouplib.update_security_profile,
57
 
                          self.fake_cluster,
58
 
                          _uuid(), 'tatore_magno(the great)')
59
 
 
60
 
    def test_update_security_profile(self):
61
 
        tenant_id = 'foo_tenant_uuid'
62
 
        secgroup_id = 'foo_secgroup_uuid'
63
 
        old_sec_prof = secgrouplib.create_security_profile(
64
 
            self.fake_cluster, tenant_id, secgroup_id,
65
 
            {'name': 'tatore_magno'})
66
 
        new_sec_prof = secgrouplib.update_security_profile(
67
 
            self.fake_cluster, old_sec_prof['uuid'], 'aaron_magno')
68
 
        self.assertEqual('aaron_magno', new_sec_prof['display_name'])
69
 
 
70
 
    def test_update_security_profile_rules(self):
71
 
        sec_prof = secgrouplib.create_security_profile(
72
 
            self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
73
 
        ingress_rule = {'ethertype': 'IPv4'}
74
 
        egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
75
 
        new_rules = {'logical_port_egress_rules': [egress_rule],
76
 
                     'logical_port_ingress_rules': [ingress_rule]}
77
 
        secgrouplib.update_security_group_rules(
78
 
            self.fake_cluster, sec_prof['uuid'], new_rules)
79
 
        sec_prof_res = nsxlib.do_request(
80
 
            nsxlib.HTTP_GET,
81
 
            nsxlib._build_uri_path('security-profile',
82
 
                                   resource_id=sec_prof['uuid']),
83
 
            cluster=self.fake_cluster)
84
 
        self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
85
 
        # Check for builtin rules
86
 
        self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
87
 
        self.assertIn(egress_rule,
88
 
                      sec_prof_res['logical_port_egress_rules'])
89
 
        self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
90
 
        self.assertIn(ingress_rule,
91
 
                      sec_prof_res['logical_port_ingress_rules'])
92
 
 
93
 
    def test_update_security_profile_rules_noingress(self):
94
 
        sec_prof = secgrouplib.create_security_profile(
95
 
            self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
96
 
        hidden_ingress_rule = {'ethertype': 'IPv4',
97
 
                               'ip_prefix': '127.0.0.1/32'}
98
 
        egress_rule = {'ethertype': 'IPv4', 'profile_uuid': 'xyz'}
99
 
        new_rules = {'logical_port_egress_rules': [egress_rule],
100
 
                     'logical_port_ingress_rules': []}
101
 
        secgrouplib.update_security_group_rules(
102
 
            self.fake_cluster, sec_prof['uuid'], new_rules)
103
 
        sec_prof_res = nsxlib.do_request(
104
 
            nsxlib.HTTP_GET,
105
 
            nsxlib._build_uri_path('security-profile',
106
 
                                   resource_id=sec_prof['uuid']),
107
 
            cluster=self.fake_cluster)
108
 
        self.assertEqual(sec_prof['uuid'], sec_prof_res['uuid'])
109
 
        # Check for builtin rules
110
 
        self.assertEqual(len(sec_prof_res['logical_port_egress_rules']), 2)
111
 
        self.assertIn(egress_rule,
112
 
                      sec_prof_res['logical_port_egress_rules'])
113
 
        self.assertEqual(len(sec_prof_res['logical_port_ingress_rules']), 1)
114
 
        self.assertIn(hidden_ingress_rule,
115
 
                      sec_prof_res['logical_port_ingress_rules'])
116
 
 
117
 
    def test_update_non_existing_securityprofile_raises(self):
118
 
        self.assertRaises(exceptions.NeutronException,
119
 
                          secgrouplib.update_security_group_rules,
120
 
                          self.fake_cluster, 'whatever',
121
 
                          {'logical_port_egress_rules': [],
122
 
                           'logical_port_ingress_rules': []})
123
 
 
124
 
    def test_delete_security_profile(self):
125
 
        sec_prof = secgrouplib.create_security_profile(
126
 
            self.fake_cluster, _uuid(), 'pippo', {'name': 'test'})
127
 
        secgrouplib.delete_security_profile(
128
 
            self.fake_cluster, sec_prof['uuid'])
129
 
        self.assertRaises(exceptions.NotFound,
130
 
                          nsxlib.do_request,
131
 
                          nsxlib.HTTP_GET,
132
 
                          nsxlib._build_uri_path(
133
 
                              'security-profile',
134
 
                              resource_id=sec_prof['uuid']),
135
 
                          cluster=self.fake_cluster)
136
 
 
137
 
    def test_delete_non_existing_securityprofile_raises(self):
138
 
        self.assertRaises(exceptions.NeutronException,
139
 
                          secgrouplib.delete_security_profile,
140
 
                          self.fake_cluster, 'whatever')