1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2012, Nachi Ueno, NTT MCL, Inc.
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
21
from quantum.api.v2 import attributes
22
from quantum.extensions import securitygroup as ext_sg
23
from quantum.plugins.linuxbridge.db import l2network_db_v2 as lb_db
24
from quantum.tests.unit import test_extension_security_group as test_sg
26
PLUGIN_NAME = ('quantum.plugins.linuxbridge.'
27
'lb_quantum_plugin.LinuxBridgePluginV2')
28
AGENT_NAME = ('quantum.plugins.linuxbridge.'
29
'agent.linuxbridg_quantum_agent.LinuxBridgeQuantumAgentRPC')
30
NOTIFIER = ('quantum.plugins.linuxbridge.'
31
'lb_quantum_plugin.AgentNotifierApi')
34
class LinuxBridgeSecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase):
35
_plugin_name = PLUGIN_NAME
37
def setUp(self, plugin=None):
38
self.addCleanup(mock.patch.stopall)
39
notifier_p = mock.patch(NOTIFIER)
40
notifier_cls = notifier_p.start()
41
self.notifier = mock.Mock()
42
notifier_cls.return_value = self.notifier
43
self._attribute_map_bk_ = {}
44
for item in attributes.RESOURCE_ATTRIBUTE_MAP:
45
self._attribute_map_bk_[item] = (attributes.
46
RESOURCE_ATTRIBUTE_MAP[item].
48
super(LinuxBridgeSecurityGroupsTestCase, self).setUp(PLUGIN_NAME)
51
super(LinuxBridgeSecurityGroupsTestCase, self).tearDown()
52
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk_
55
class TestLinuxBridgeSecurityGroups(LinuxBridgeSecurityGroupsTestCase,
56
test_sg.TestSecurityGroups):
58
def test_security_group_rule_updated(self):
60
description = 'my webservers'
61
with self.security_group(name, description) as sg:
62
with self.security_group(name, description) as sg2:
63
security_group_id = sg['security_group']['id']
65
source_group_id = sg2['security_group']['id']
69
with self.security_group_rule(security_group_id, direction,
70
protocol, port_range_min,
72
source_group_id=source_group_id
75
self.notifier.assert_has_calls(
76
[call.security_groups_rule_updated(mock.ANY,
78
call.security_groups_rule_updated(mock.ANY,
79
[security_group_id])])
81
def test_security_group_member_updated(self):
82
with self.network() as n:
84
with self.security_group() as sg:
85
security_group_id = sg['security_group']['id']
86
res = self._create_port('json', n['network']['id'])
87
port = self.deserialize('json', res)
89
data = {'port': {'fixed_ips': port['port']['fixed_ips'],
90
'name': port['port']['name'],
94
req = self.new_update_request('ports', data,
96
res = self.deserialize('json', req.get_response(self.api))
97
self.assertEquals(res['port'][ext_sg.SECURITYGROUP][0],
99
self._delete('ports', port['port']['id'])
100
self.notifier.assert_has_calls(
101
[call.security_groups_member_updated(
102
mock.ANY, [mock.ANY]),
103
call.security_groups_member_updated(
104
mock.ANY, [security_group_id])])
107
class TestLinuxBridgeSecurityGroupsDB(LinuxBridgeSecurityGroupsTestCase):
108
def test_security_group_get_port_from_device(self):
109
with self.network() as n:
111
with self.security_group() as sg:
112
security_group_id = sg['security_group']['id']
113
res = self._create_port('json', n['network']['id'])
114
port = self.deserialize('json', res)
115
fixed_ips = port['port']['fixed_ips']
116
data = {'port': {'fixed_ips': fixed_ips,
117
'name': port['port']['name'],
118
ext_sg.SECURITYGROUP:
119
[security_group_id]}}
121
req = self.new_update_request('ports', data,
123
res = self.deserialize('json', req.get_response(self.api))
124
port_id = res['port']['id']
125
device_id = port_id[:8]
126
port_dict = lb_db.get_port_from_device(device_id)
127
self.assertEqual(port_id, port_dict['id'])
128
self.assertEqual([security_group_id],
129
port_dict[ext_sg.SECURITYGROUP])
130
self.assertEqual([], port_dict['security_group_rules'])
131
self.assertEqual([fixed_ips[0]['ip_address']],
132
port_dict['fixed_ips'])
133
self._delete('ports', port['port']['id'])
135
def test_security_group_get_port_from_device_with_no_port(self):
136
port_dict = lb_db.get_port_from_device('bad_device_id')
137
self.assertEqual(None, port_dict)