1
# Copyright 2015 OpenStack Foundation.
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
22
from neutron.openstack.common import uuidutils
23
from neutron.tests import base
25
# Mocking imports of 3rd party vyatta library in unit tests and all modules
26
# that depends on this library. Import will fail if not mocked and 3rd party
27
# vyatta library is not installed.
28
with mock.patch.dict(sys.modules, {
29
'networking_brocade': mock.Mock(),
30
'networking_brocade.vyatta': mock.Mock(),
31
'networking_brocade.vyatta.common': mock.Mock(),
32
'networking_brocade.vyatta.vrouter': mock.Mock(),
34
from networking_brocade.vyatta.vrouter import client as vyatta_client
35
from neutron_fwaas.services.firewall.agents.vyatta import vyatta_utils
36
from neutron_fwaas.services.firewall.drivers.vyatta import vyatta_fwaas
38
_uuid = uuidutils.generate_uuid
40
FAKE_FW_UUID = _uuid()
43
def fake_cmd(*args, **kwargs):
47
class VyattaFwaasTestCase(base.BaseTestCase):
49
super(VyattaFwaasTestCase, self).setUp()
51
mock.patch.object(vyatta_client, 'SetCmd', fake_cmd).start()
52
mock.patch.object(vyatta_client, 'DeleteCmd', fake_cmd).start()
54
self.fwaas_driver = vyatta_fwaas.VyattaFirewallDriver()
56
self.fake_rules = [self._make_fake_fw_rule()]
57
self.fake_firewall = self._make_fake_firewall(self.fake_rules)
58
self.fake_firewall_name = vyatta_utils.get_firewall_name(
59
None, self.fake_firewall)
60
self.fake_apply_list = [self._make_fake_router_info()]
61
self.fake_agent_mode = None
63
def test_create_firewall(self):
65
with mock.patch.object(
66
self.fwaas_driver, 'update_firewall') as fw_update:
67
self.fwaas_driver.create_firewall(
68
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
70
fw_update.assert_called_once_with(
71
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
73
def test_update_firewall(self):
74
with mock.patch.object(
75
self.fwaas_driver, '_update_firewall') as fw_update:
76
self.fake_firewall['admin_state_up'] = True
77
self.fwaas_driver.create_firewall(
78
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
80
fw_update.assert_called_once_with(
81
self.fake_apply_list, self.fake_firewall)
83
with mock.patch.object(
84
self.fwaas_driver, 'apply_default_policy') as fw_apply_policy:
85
self.fake_firewall['admin_state_up'] = False
86
self.fwaas_driver.create_firewall(
87
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
89
fw_apply_policy.assert_called_once_with(
90
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
92
def test_delete_firewall(self):
93
with mock.patch.object(
94
self.fwaas_driver, 'apply_default_policy') as fw_apply_policy:
95
self.fwaas_driver.delete_firewall(
96
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
98
fw_apply_policy.assert_called_once_with(
99
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
101
def test_apply_default_policy(self):
102
with mock.patch.object(
103
self.fwaas_driver, '_delete_firewall') as fw_delete:
104
self.fwaas_driver.apply_default_policy(
105
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
107
calls = [mock.call(x, self.fake_firewall)
108
for x in self.fake_apply_list]
109
fw_delete.assert_has_calls(calls)
111
def test_update_firewall_internal(self):
112
with mock.patch.object(
113
self.fwaas_driver, '_delete_firewall'
114
) as fw_delete, mock.patch.object(
115
self.fwaas_driver, '_setup_firewall') as fw_setup:
116
self.fwaas_driver._update_firewall(
117
self.fake_apply_list, self.fake_firewall)
119
calls = [mock.call(x, self.fake_firewall)
120
for x in self.fake_apply_list]
122
fw_delete.assert_has_calls(calls)
123
fw_setup.assert_has_calls(calls)
125
def test_setup_firewall_internal(self):
126
fake_rule = self._make_fake_fw_rule()
127
fake_router_info = self._make_fake_router_info()
128
fake_rule_cmd = 'fake-fw-rule0'
129
fake_zone_configure_rules = ['fake-config-rule0']
131
mock_api = mock.Mock()
132
mock_api_gen = mock.Mock(return_value=mock_api)
133
mock_get_firewall_rule = mock.Mock(return_value=[fake_rule_cmd])
134
mock_get_zone_cmds = mock.Mock(return_value=fake_zone_configure_rules)
135
with contextlib.nested(
137
self.fwaas_driver, '_get_vyatta_client', mock_api_gen),
139
vyatta_fwaas.vyatta_utils, 'get_zone_cmds',
142
self.fwaas_driver, '_set_firewall_rule',
143
mock_get_firewall_rule)):
144
self.fwaas_driver._setup_firewall(
145
fake_router_info, self.fake_firewall)
147
mock_api_gen.assert_called_once_with(
148
fake_router_info.router)
149
mock_get_firewall_rule.assert_called_once_with(
150
self.fake_firewall_name, 1, fake_rule)
151
mock_get_zone_cmds.assert_called_once_with(
152
mock_api, fake_router_info, self.fake_firewall_name)
155
vyatta_client.SetCmd(
156
vyatta_fwaas.FW_NAME.format(
157
self.fake_firewall_name)),
158
vyatta_client.SetCmd(
159
vyatta_fwaas.FW_DESCRIPTION.format(
160
self.fake_firewall_name,
161
urllib.quote_plus(self.fake_firewall['description']))),
162
vyatta_client.SetCmd(
163
vyatta_fwaas.FW_ESTABLISHED_ACCEPT),
164
vyatta_client.SetCmd(
165
vyatta_fwaas.FW_RELATED_ACCEPT),
167
] + fake_zone_configure_rules
168
mock_api.exec_cmd_batch.assert_called_once_with(cmds)
170
def test_delete_firewall_internal(self):
171
fake_router_info = self._make_fake_router_info()
173
with mock.patch.object(
175
'_get_vyatta_client') as mock_client_factory:
176
mock_api = mock_client_factory.return_value
178
self.fwaas_driver._delete_firewall(
179
fake_router_info, self.fake_firewall)
182
vyatta_client.DeleteCmd("zone-policy"),
183
vyatta_client.DeleteCmd(vyatta_fwaas.FW_NAME.format(
184
self.fake_firewall_name)),
185
vyatta_client.DeleteCmd("firewall/state-policy"),
187
mock_api.exec_cmd_batch.assert_called_once_with(cmds)
189
def test_set_firewall_rule_internal(self):
190
fake_rule = self._make_fake_fw_rule()
191
fake_firewall_name = 'fake-fw-name'
194
'description': 'rule description',
195
'source_port': '2080',
196
'destination_ip_address': '172.16.1.1'
202
cmds_actual = self.fwaas_driver._set_firewall_rule(
203
fake_firewall_name, 1, fake_rule)
205
vyatta_client.SetCmd(
206
vyatta_fwaas.FW_RULE_DESCRIPTION.format(
207
urllib.quote_plus(fake_firewall_name), 1,
208
urllib.quote_plus(fake_rule['description'])))
212
('protocol', vyatta_fwaas.FW_RULE_PROTOCOL),
213
('source_port', vyatta_fwaas.FW_RULE_SRC_PORT),
214
('destination_port', vyatta_fwaas.FW_RULE_DEST_PORT),
215
('source_ip_address', vyatta_fwaas.FW_RULE_SRC_ADDR),
216
('destination_ip_address', vyatta_fwaas.FW_RULE_DEST_ADDR),
219
for key, url in rules:
220
cmds_expect.append(vyatta_client.SetCmd(
222
urllib.quote_plus(fake_firewall_name), 1,
223
urllib.quote_plus(fake_rule[key]))))
225
cmds_expect.append(vyatta_client.SetCmd(
226
vyatta_fwaas.FW_RULE_ACTION.format(
227
urllib.quote_plus(fake_firewall_name), 1,
228
action_map.get(fake_rule['action'], 'drop'))))
230
self.assertEqual(cmds_expect, cmds_actual)
232
def _make_fake_router_info(self):
235
'id': 'fake-router-id',
236
'tenant_id': 'tenant-uuid',
240
def _make_fake_fw_rule(self):
246
'destination_port': '80',
247
'source_ip_address': '10.24.4.2'}
249
def _make_fake_firewall(self, rules):
250
return {'id': FAKE_FW_UUID,
251
'admin_state_up': True,
252
'name': 'test-firewall',
253
'tenant_id': 'tenant-uuid',
254
'description': 'Fake firewall',
255
'firewall_rule_list': rules}