1
# Copyright 2015 Cisco Systems, Inc. All rights reserved.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4
# not use this file except in compliance with the License. You may obtain
5
# a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
# License for the specific language governing permissions and limitations
19
from neutron.api.v2 import attributes as attr
20
from neutron import context
21
from neutron import manager
22
from neutron.plugins.common import constants as const
23
from neutron.tests.unit import test_l3_plugin
24
from neutron.tests.unit import testlib_plugin
27
from neutron_fwaas.db.cisco import cisco_fwaas_db as csrfw_db
28
from neutron_fwaas.extensions.cisco import csr_firewall_insertion
29
from neutron_fwaas.extensions import firewall
30
from neutron_fwaas.services.firewall.plugins.cisco import cisco_fwaas_plugin
31
from neutron_fwaas.tests.unit.db.firewall import test_db_firewall
32
from oslo_config import cfg
34
# We need the test_l3_plugin to ensure we have a valid port_id corresponding
35
# to a router interface.
36
CORE_PLUGIN_KLASS = 'neutron.tests.unit.test_l3_plugin.TestNoL3NatPlugin'
37
L3_PLUGIN_KLASS = 'neutron.tests.unit.test_l3_plugin.TestL3NatServicePlugin'
38
# the plugin under test
39
CSR_FW_PLUGIN_KLASS = (
40
"neutron_fwaas.services.firewall.plugins.cisco.cisco_fwaas_plugin."
43
extensions_path = neutron_fwaas.extensions.__path__[0] + '/cisco'
46
class CSR1kvFirewallTestExtensionManager(
47
test_l3_plugin.L3TestExtensionManager):
49
def get_resources(self):
50
res = super(CSR1kvFirewallTestExtensionManager, self).get_resources()
51
firewall.RESOURCE_ATTRIBUTE_MAP['firewalls'].update(
52
csr_firewall_insertion.EXTENDED_ATTRIBUTES_2_0['firewalls'])
53
return res + firewall.Firewall.get_resources()
55
def get_actions(self):
58
def get_request_extensions(self):
62
class CSR1kvFirewallTestCaseBase(test_db_firewall.FirewallPluginDbTestCase,
63
testlib_plugin.NotificationSetupHelper,
64
test_l3_plugin.L3NatTestCaseMixin):
66
def setUp(self, core_plugin=None, l3_plugin=None, fw_plugin=None,
68
self.agentapi_delf_p = mock.patch(test_db_firewall.DELETEFW_PATH,
69
create=True, new=test_db_firewall.FakeAgentApi().delete_firewall)
70
self.agentapi_delf_p.start()
71
cfg.CONF.set_override('api_extensions_path', extensions_path)
72
# for these tests we need to enable overlapping ips
73
cfg.CONF.set_default('allow_overlapping_ips', True)
74
cfg.CONF.set_default('max_routes', 3)
75
self.saved_attr_map = {}
76
for resource, attrs in attr.RESOURCE_ATTRIBUTE_MAP.iteritems():
77
self.saved_attr_map[resource] = attrs.copy()
79
core_plugin = CORE_PLUGIN_KLASS
81
l3_plugin = L3_PLUGIN_KLASS
83
fw_plugin = CSR_FW_PLUGIN_KLASS
84
service_plugins = {'l3_plugin_name': l3_plugin,
85
'fw_plugin_name': fw_plugin}
87
ext_mgr = CSR1kvFirewallTestExtensionManager()
88
super(test_db_firewall.FirewallPluginDbTestCase, self).setUp(
89
plugin=core_plugin, service_plugins=service_plugins,
92
self.core_plugin = manager.NeutronManager.get_plugin()
93
self.l3_plugin = manager.NeutronManager.get_service_plugins().get(
95
self.plugin = manager.NeutronManager.get_service_plugins().get(
97
self.callbacks = self.plugin.endpoints[0]
99
self.setup_notification_driver()
101
def restore_attribute_map(self):
102
# Remove the csrfirewallinsertion extension
103
firewall.RESOURCE_ATTRIBUTE_MAP['firewalls'].pop('port_id')
104
firewall.RESOURCE_ATTRIBUTE_MAP['firewalls'].pop('direction')
105
# Restore the original RESOURCE_ATTRIBUTE_MAP
106
attr.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
109
self.restore_attribute_map()
110
super(CSR1kvFirewallTestCaseBase, self).tearDown()
112
def _create_firewall(self, fmt, name, description, firewall_policy_id,
113
admin_state_up=True, expected_res_status=None,
115
tenant_id = kwargs.get('tenant_id', self._tenant_id)
116
port_id = kwargs.get('port_id')
117
direction = kwargs.get('direction')
118
data = {'firewall': {'name': name,
119
'description': description,
120
'firewall_policy_id': firewall_policy_id,
121
'admin_state_up': admin_state_up,
122
'tenant_id': tenant_id}}
124
data['firewall']['port_id'] = port_id
126
data['firewall']['direction'] = direction
127
firewall_req = self.new_create_request('firewalls', data, fmt)
128
firewall_res = firewall_req.get_response(self.ext_api)
129
if expected_res_status:
130
self.assertEqual(expected_res_status, firewall_res.status_int)
134
class TestCiscoFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
137
super(TestCiscoFirewallCallbacks, self).setUp()
138
self.plugin = cisco_fwaas_plugin.CSRFirewallPlugin()
139
self.callbacks = self.plugin.endpoints[0]
141
def test_firewall_deleted(self):
142
ctx = context.get_admin_context()
143
with self.firewall_policy(do_delete=False) as fwp:
144
fwp_id = fwp['firewall_policy']['id']
145
with self.firewall(firewall_policy_id=fwp_id,
146
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
147
do_delete=False) as fw:
148
fw_id = fw['firewall']['id']
149
with ctx.session.begin(subtransactions=True):
150
fw_db = self.plugin._get_firewall(ctx, fw_id)
151
fw_db['status'] = const.PENDING_DELETE
153
res = self.callbacks.firewall_deleted(ctx, fw_id,
156
self.assertRaises(firewall.FirewallNotFound,
157
self.plugin.get_firewall,
161
class TestCiscoFirewallPlugin(CSR1kvFirewallTestCaseBase,
162
csrfw_db.CiscoFirewall_db_mixin):
165
super(TestCiscoFirewallPlugin, self).setUp()
166
self.fake_vendor_ext = {
167
'host_mngt_ip': '1.2.3.4',
168
'host_usr_nm': 'admin',
169
'host_usr_pw': 'cisco',
170
'if_list': {'port': {'id': 0, 'hosting_info': 'csr'},
171
'direction': 'default'}
173
self.mock_get_hosting_info = mock.patch.object(
174
self.plugin, '_get_hosting_info').start()
176
def test_create_csr_firewall(self):
178
with contextlib.nested(
179
self.router(tenant_id=self._tenant_id),
183
body = self._router_interface_action(
188
port_id = body['port_id']
190
self.fake_vendor_ext['if_list']['port']['id'] = port_id
191
self.fake_vendor_ext['if_list']['direction'] = 'inside'
192
self.mock_get_hosting_info.return_value = self.fake_vendor_ext
194
with self.firewall(port_id=body['port_id'],
195
direction='inside') as fw:
196
ctx = context.get_admin_context()
197
fw_id = fw['firewall']['id']
198
csrfw = self.lookup_firewall_csr_association(
200
# cant be in PENDING_XXX state for delete clean up
201
with ctx.session.begin(subtransactions=True):
202
fw_db = self.plugin._get_firewall(ctx, fw_id)
203
fw_db['status'] = const.ACTIVE
205
self._router_interface_action(
211
self.assertEqual('firewall_1', fw['firewall']['name'])
212
self.assertEqual(port_id, csrfw['port_id'])
213
self.assertEqual('inside', csrfw['direction'])
215
def test_create_csr_firewall_only_port_id_specified(self):
217
with contextlib.nested(
218
self.router(tenant_id=self._tenant_id),
222
body = self._router_interface_action(
227
port_id = body['port_id']
229
self.fake_vendor_ext['if_list']['port']['id'] = port_id
230
self.fake_vendor_ext['if_list']['direction'] = None
231
self.mock_get_hosting_info.return_value = self.fake_vendor_ext
233
with self.firewall(port_id=body['port_id']) as fw:
234
ctx = context.get_admin_context()
235
fw_id = fw['firewall']['id']
236
csrfw = self.lookup_firewall_csr_association(
238
# cant be in PENDING_XXX state for delete clean up
239
with ctx.session.begin(subtransactions=True):
240
fw_db = self.plugin._get_firewall(ctx, fw_id)
241
fw_db['status'] = const.ACTIVE
243
self._router_interface_action(
249
self.assertEqual('firewall_1', fw['firewall']['name'])
250
self.assertEqual(port_id, csrfw['port_id'])
251
self.assertEqual(None, csrfw['direction'])
253
def test_create_csr_firewall_no_port_id_no_direction_specified(self):
255
with self.firewall() as fw:
256
ctx = context.get_admin_context()
257
fw_id = fw['firewall']['id']
258
csrfw = self.lookup_firewall_csr_association(
260
# cant be in PENDING_XXX state for delete clean up
261
with ctx.session.begin(subtransactions=True):
262
fw_db = self.plugin._get_firewall(ctx, fw_id)
263
fw_db['status'] = const.ACTIVE
266
self.assertEqual('firewall_1', fw['firewall']['name'])
267
self.assertEqual(None, csrfw)
269
def test_update_csr_firewall(self):
271
with contextlib.nested(
272
self.router(tenant_id=self._tenant_id),
276
body = self._router_interface_action(
281
port_id = body['port_id']
283
self.fake_vendor_ext['if_list']['port']['id'] = port_id
284
self.fake_vendor_ext['if_list']['direction'] = 'inside'
285
self.mock_get_hosting_info.return_value = self.fake_vendor_ext
287
with self.firewall(port_id=body['port_id'],
288
direction='both') as fw:
289
ctx = context.get_admin_context()
290
fw_id = fw['firewall']['id']
291
csrfw = self.lookup_firewall_csr_association(
293
status_data = {'acl_id': 100}
295
res = self.callbacks.set_firewall_status(ctx, fw_id,
296
const.ACTIVE, status_data)
298
# update direction on same port
299
data = {'firewall': {'name': 'firewall_2',
300
'direction': 'both', 'port_id': port_id}}
301
req = self.new_update_request('firewalls', data,
302
fw['firewall']['id'])
303
req.environ['neutron.context'] = context.Context(
305
res = self.deserialize(self.fmt,
306
req.get_response(self.ext_api))
308
csrfw = self.lookup_firewall_csr_association(ctx,
309
fw['firewall']['id'])
311
self.assertEqual('firewall_2', res['firewall']['name'])
312
self.assertEqual(port_id, csrfw['port_id'])
313
self.assertEqual('both', csrfw['direction'])
315
# cant be in PENDING_XXX state for delete clean up
316
with ctx.session.begin(subtransactions=True):
317
fw_db = self.plugin._get_firewall(ctx, fw_id)
318
fw_db['status'] = const.ACTIVE
320
self._router_interface_action(
326
def test_update_csr_firewall_port_id(self):
328
with contextlib.nested(
329
self.router(tenant_id=self._tenant_id),
331
self.subnet(cidr='20.0.0.0/24'),
334
body = self._router_interface_action(
339
port_id1 = body['port_id']
341
body = self._router_interface_action(
346
port_id2 = body['port_id']
348
self.fake_vendor_ext['if_list']['port']['id'] = port_id1
349
self.fake_vendor_ext['if_list']['direction'] = 'inside'
350
self.mock_get_hosting_info.return_value = self.fake_vendor_ext
352
with self.firewall(port_id=port_id1,
353
direction='both') as fw:
354
ctx = context.get_admin_context()
355
fw_id = fw['firewall']['id']
356
status_data = {'acl_id': 100}
358
res = self.callbacks.set_firewall_status(ctx, fw_id,
359
const.ACTIVE, status_data)
361
# update direction on same port
362
data = {'firewall': {'name': 'firewall_2',
363
'direction': 'both', 'port_id': port_id2}}
364
req = self.new_update_request('firewalls', data,
365
fw['firewall']['id'])
366
req.environ['neutron.context'] = context.Context(
368
res = self.deserialize(self.fmt,
369
req.get_response(self.ext_api))
371
csrfw = self.lookup_firewall_csr_association(ctx,
372
fw['firewall']['id'])
374
self.assertEqual('firewall_2', res['firewall']['name'])
375
self.assertEqual(port_id2, csrfw['port_id'])
376
self.assertEqual('both', csrfw['direction'])
378
# cant be in PENDING_XXX state for delete clean up
379
with ctx.session.begin(subtransactions=True):
380
fw_db = self.plugin._get_firewall(ctx, fw_id)
381
fw_db['status'] = const.ACTIVE
383
self._router_interface_action('remove',
387
self._router_interface_action(
393
def test_delete_csr_firewall(self):
395
with contextlib.nested(
396
self.router(tenant_id=self._tenant_id),
399
body = self._router_interface_action(
404
port_id = body['port_id']
406
self.fake_vendor_ext['if_list']['port']['id'] = port_id
407
self.fake_vendor_ext['if_list']['direction'] = 'inside'
408
self.mock_get_hosting_info.return_value = self.fake_vendor_ext
410
with self.firewall(port_id=port_id,
411
direction='inside', do_delete=False) as fw:
412
fw_id = fw['firewall']['id']
413
ctx = context.get_admin_context()
414
csrfw = self.lookup_firewall_csr_association(ctx,
416
self.assertNotEqual(None, csrfw)
417
req = self.new_delete_request('firewalls', fw_id)
418
req.get_response(self.ext_api)
419
with ctx.session.begin(subtransactions=True):
420
fw_db = self.plugin._get_firewall(ctx, fw_id)
421
fw_db['status'] = const.PENDING_DELETE
423
self.callbacks.firewall_deleted(ctx, fw_id)
424
csrfw = self.lookup_firewall_csr_association(ctx,
426
self.assertEqual(None, csrfw)
427
self._router_interface_action(