~ubuntu-branches/ubuntu/vivid/neutron-fwaas/vivid-updates

« back to all changes in this revision

Viewing changes to neutron_fwaas/tests/unit/services/firewall/plugins/cisco/test_cisco_fwaas_plugin.py

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2015-03-30 11:19:40 UTC
  • mfrom: (1.1.2)
  • Revision ID: package-import@ubuntu.com-20150330111940-2v7kca772me1xuif
Tags: 2015.1~b3-0ubuntu1
* New upstream milestone release:
  - d/control: Align version requirement with upstream, add new dependency
    on oslo-log.
* Enable unit test suite execution:
  - d/control: Switch BD on python-neutron -> neutron-common.
  - d/rules: Enable execution of unit tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2015 Cisco Systems, Inc.  All rights reserved.
 
2
#
 
3
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
4
#    not use this file except in compliance with the License. You may obtain
 
5
#    a copy of the License at
 
6
#
 
7
#         http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
#    Unless required by applicable law or agreed to in writing, software
 
10
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
11
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
12
#    License for the specific language governing permissions and limitations
 
13
#    under the License.
 
14
#
 
15
 
 
16
import contextlib
 
17
import mock
 
18
 
 
19
from neutron.api.v2 import attributes as attr
 
20
from neutron import context
 
21
from neutron import manager
 
22
from neutron.plugins.common import constants as const
 
23
from neutron.tests.unit import test_l3_plugin
 
24
from neutron.tests.unit import testlib_plugin
 
25
 
 
26
import neutron_fwaas
 
27
from neutron_fwaas.db.cisco import cisco_fwaas_db as csrfw_db
 
28
from neutron_fwaas.extensions.cisco import csr_firewall_insertion
 
29
from neutron_fwaas.extensions import firewall
 
30
from neutron_fwaas.services.firewall.plugins.cisco import cisco_fwaas_plugin
 
31
from neutron_fwaas.tests.unit.db.firewall import test_db_firewall
 
32
from oslo_config import cfg
 
33
 
 
34
# We need the test_l3_plugin to ensure we have a valid port_id corresponding
 
35
# to a router interface.
 
36
CORE_PLUGIN_KLASS = 'neutron.tests.unit.test_l3_plugin.TestNoL3NatPlugin'
 
37
L3_PLUGIN_KLASS = 'neutron.tests.unit.test_l3_plugin.TestL3NatServicePlugin'
 
38
# the plugin under test
 
39
CSR_FW_PLUGIN_KLASS = (
 
40
    "neutron_fwaas.services.firewall.plugins.cisco.cisco_fwaas_plugin."
 
41
    "CSRFirewallPlugin"
 
42
)
 
43
extensions_path = neutron_fwaas.extensions.__path__[0] + '/cisco'
 
44
 
 
45
 
 
46
class CSR1kvFirewallTestExtensionManager(
 
47
    test_l3_plugin.L3TestExtensionManager):
 
48
 
 
49
    def get_resources(self):
 
50
        res = super(CSR1kvFirewallTestExtensionManager, self).get_resources()
 
51
        firewall.RESOURCE_ATTRIBUTE_MAP['firewalls'].update(
 
52
            csr_firewall_insertion.EXTENDED_ATTRIBUTES_2_0['firewalls'])
 
53
        return res + firewall.Firewall.get_resources()
 
54
 
 
55
    def get_actions(self):
 
56
        return []
 
57
 
 
58
    def get_request_extensions(self):
 
59
        return []
 
60
 
 
61
 
 
62
class CSR1kvFirewallTestCaseBase(test_db_firewall.FirewallPluginDbTestCase,
 
63
        testlib_plugin.NotificationSetupHelper,
 
64
        test_l3_plugin.L3NatTestCaseMixin):
 
65
 
 
66
    def setUp(self, core_plugin=None, l3_plugin=None, fw_plugin=None,
 
67
            ext_mgr=None):
 
68
        self.agentapi_delf_p = mock.patch(test_db_firewall.DELETEFW_PATH,
 
69
            create=True, new=test_db_firewall.FakeAgentApi().delete_firewall)
 
70
        self.agentapi_delf_p.start()
 
71
        cfg.CONF.set_override('api_extensions_path', extensions_path)
 
72
        # for these tests we need to enable overlapping ips
 
73
        cfg.CONF.set_default('allow_overlapping_ips', True)
 
74
        cfg.CONF.set_default('max_routes', 3)
 
75
        self.saved_attr_map = {}
 
76
        for resource, attrs in attr.RESOURCE_ATTRIBUTE_MAP.iteritems():
 
77
            self.saved_attr_map[resource] = attrs.copy()
 
78
        if not core_plugin:
 
79
            core_plugin = CORE_PLUGIN_KLASS
 
80
        if l3_plugin is None:
 
81
            l3_plugin = L3_PLUGIN_KLASS
 
82
        if not fw_plugin:
 
83
            fw_plugin = CSR_FW_PLUGIN_KLASS
 
84
        service_plugins = {'l3_plugin_name': l3_plugin,
 
85
            'fw_plugin_name': fw_plugin}
 
86
        if not ext_mgr:
 
87
            ext_mgr = CSR1kvFirewallTestExtensionManager()
 
88
        super(test_db_firewall.FirewallPluginDbTestCase, self).setUp(
 
89
            plugin=core_plugin, service_plugins=service_plugins,
 
90
            ext_mgr=ext_mgr)
 
91
 
 
92
        self.core_plugin = manager.NeutronManager.get_plugin()
 
93
        self.l3_plugin = manager.NeutronManager.get_service_plugins().get(
 
94
            const.L3_ROUTER_NAT)
 
95
        self.plugin = manager.NeutronManager.get_service_plugins().get(
 
96
            const.FIREWALL)
 
97
        self.callbacks = self.plugin.endpoints[0]
 
98
 
 
99
        self.setup_notification_driver()
 
100
 
 
101
    def restore_attribute_map(self):
 
102
        # Remove the csrfirewallinsertion extension
 
103
        firewall.RESOURCE_ATTRIBUTE_MAP['firewalls'].pop('port_id')
 
104
        firewall.RESOURCE_ATTRIBUTE_MAP['firewalls'].pop('direction')
 
105
        # Restore the original RESOURCE_ATTRIBUTE_MAP
 
106
        attr.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
 
107
 
 
108
    def tearDown(self):
 
109
        self.restore_attribute_map()
 
110
        super(CSR1kvFirewallTestCaseBase, self).tearDown()
 
111
 
 
112
    def _create_firewall(self, fmt, name, description, firewall_policy_id,
 
113
                         admin_state_up=True, expected_res_status=None,
 
114
                         **kwargs):
 
115
        tenant_id = kwargs.get('tenant_id', self._tenant_id)
 
116
        port_id = kwargs.get('port_id')
 
117
        direction = kwargs.get('direction')
 
118
        data = {'firewall': {'name': name,
 
119
                             'description': description,
 
120
                             'firewall_policy_id': firewall_policy_id,
 
121
                             'admin_state_up': admin_state_up,
 
122
                             'tenant_id': tenant_id}}
 
123
        if port_id:
 
124
            data['firewall']['port_id'] = port_id
 
125
        if direction:
 
126
            data['firewall']['direction'] = direction
 
127
        firewall_req = self.new_create_request('firewalls', data, fmt)
 
128
        firewall_res = firewall_req.get_response(self.ext_api)
 
129
        if expected_res_status:
 
130
            self.assertEqual(expected_res_status, firewall_res.status_int)
 
131
        return firewall_res
 
132
 
 
133
 
 
134
class TestCiscoFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
 
135
 
 
136
    def setUp(self):
 
137
        super(TestCiscoFirewallCallbacks, self).setUp()
 
138
        self.plugin = cisco_fwaas_plugin.CSRFirewallPlugin()
 
139
        self.callbacks = self.plugin.endpoints[0]
 
140
 
 
141
    def test_firewall_deleted(self):
 
142
        ctx = context.get_admin_context()
 
143
        with self.firewall_policy(do_delete=False) as fwp:
 
144
            fwp_id = fwp['firewall_policy']['id']
 
145
            with self.firewall(firewall_policy_id=fwp_id,
 
146
                               admin_state_up=test_db_firewall.ADMIN_STATE_UP,
 
147
                               do_delete=False) as fw:
 
148
                fw_id = fw['firewall']['id']
 
149
                with ctx.session.begin(subtransactions=True):
 
150
                    fw_db = self.plugin._get_firewall(ctx, fw_id)
 
151
                    fw_db['status'] = const.PENDING_DELETE
 
152
                    ctx.session.flush()
 
153
                    res = self.callbacks.firewall_deleted(ctx, fw_id,
 
154
                                                          host='dummy')
 
155
                    self.assertTrue(res)
 
156
                    self.assertRaises(firewall.FirewallNotFound,
 
157
                                      self.plugin.get_firewall,
 
158
                                      ctx, fw_id)
 
159
 
 
160
 
 
161
class TestCiscoFirewallPlugin(CSR1kvFirewallTestCaseBase,
 
162
                              csrfw_db.CiscoFirewall_db_mixin):
 
163
 
 
164
    def setUp(self):
 
165
        super(TestCiscoFirewallPlugin, self).setUp()
 
166
        self.fake_vendor_ext = {
 
167
            'host_mngt_ip': '1.2.3.4',
 
168
            'host_usr_nm': 'admin',
 
169
            'host_usr_pw': 'cisco',
 
170
            'if_list': {'port': {'id': 0, 'hosting_info': 'csr'},
 
171
                        'direction': 'default'}
 
172
        }
 
173
        self.mock_get_hosting_info = mock.patch.object(
 
174
            self.plugin, '_get_hosting_info').start()
 
175
 
 
176
    def test_create_csr_firewall(self):
 
177
 
 
178
        with contextlib.nested(
 
179
            self.router(tenant_id=self._tenant_id),
 
180
            self.subnet(),
 
181
        ) as (r, s):
 
182
 
 
183
            body = self._router_interface_action(
 
184
                'add',
 
185
                r['router']['id'],
 
186
                s['subnet']['id'],
 
187
                None)
 
188
            port_id = body['port_id']
 
189
 
 
190
            self.fake_vendor_ext['if_list']['port']['id'] = port_id
 
191
            self.fake_vendor_ext['if_list']['direction'] = 'inside'
 
192
            self.mock_get_hosting_info.return_value = self.fake_vendor_ext
 
193
 
 
194
            with self.firewall(port_id=body['port_id'],
 
195
                direction='inside') as fw:
 
196
                ctx = context.get_admin_context()
 
197
                fw_id = fw['firewall']['id']
 
198
                csrfw = self.lookup_firewall_csr_association(
 
199
                    ctx, fw_id)
 
200
                # cant be in PENDING_XXX state for delete clean up
 
201
                with ctx.session.begin(subtransactions=True):
 
202
                    fw_db = self.plugin._get_firewall(ctx, fw_id)
 
203
                    fw_db['status'] = const.ACTIVE
 
204
                    ctx.session.flush()
 
205
            self._router_interface_action(
 
206
                'remove',
 
207
                r['router']['id'],
 
208
                s['subnet']['id'],
 
209
                None)
 
210
 
 
211
            self.assertEqual('firewall_1', fw['firewall']['name'])
 
212
            self.assertEqual(port_id, csrfw['port_id'])
 
213
            self.assertEqual('inside', csrfw['direction'])
 
214
 
 
215
    def test_create_csr_firewall_only_port_id_specified(self):
 
216
 
 
217
        with contextlib.nested(
 
218
            self.router(tenant_id=self._tenant_id),
 
219
            self.subnet(),
 
220
        ) as (r, s):
 
221
 
 
222
            body = self._router_interface_action(
 
223
                'add',
 
224
                r['router']['id'],
 
225
                s['subnet']['id'],
 
226
                None)
 
227
            port_id = body['port_id']
 
228
 
 
229
            self.fake_vendor_ext['if_list']['port']['id'] = port_id
 
230
            self.fake_vendor_ext['if_list']['direction'] = None
 
231
            self.mock_get_hosting_info.return_value = self.fake_vendor_ext
 
232
 
 
233
            with self.firewall(port_id=body['port_id']) as fw:
 
234
                ctx = context.get_admin_context()
 
235
                fw_id = fw['firewall']['id']
 
236
                csrfw = self.lookup_firewall_csr_association(
 
237
                    ctx, fw_id)
 
238
                # cant be in PENDING_XXX state for delete clean up
 
239
                with ctx.session.begin(subtransactions=True):
 
240
                    fw_db = self.plugin._get_firewall(ctx, fw_id)
 
241
                    fw_db['status'] = const.ACTIVE
 
242
                    ctx.session.flush()
 
243
            self._router_interface_action(
 
244
                'remove',
 
245
                r['router']['id'],
 
246
                s['subnet']['id'],
 
247
                None)
 
248
 
 
249
            self.assertEqual('firewall_1', fw['firewall']['name'])
 
250
            self.assertEqual(port_id, csrfw['port_id'])
 
251
            self.assertEqual(None, csrfw['direction'])
 
252
 
 
253
    def test_create_csr_firewall_no_port_id_no_direction_specified(self):
 
254
 
 
255
        with self.firewall() as fw:
 
256
            ctx = context.get_admin_context()
 
257
            fw_id = fw['firewall']['id']
 
258
            csrfw = self.lookup_firewall_csr_association(
 
259
                ctx, fw_id)
 
260
            # cant be in PENDING_XXX state for delete clean up
 
261
            with ctx.session.begin(subtransactions=True):
 
262
                fw_db = self.plugin._get_firewall(ctx, fw_id)
 
263
                fw_db['status'] = const.ACTIVE
 
264
                ctx.session.flush()
 
265
 
 
266
            self.assertEqual('firewall_1', fw['firewall']['name'])
 
267
            self.assertEqual(None, csrfw)
 
268
 
 
269
    def test_update_csr_firewall(self):
 
270
 
 
271
        with contextlib.nested(
 
272
            self.router(tenant_id=self._tenant_id),
 
273
            self.subnet(),
 
274
        ) as (r, s):
 
275
 
 
276
            body = self._router_interface_action(
 
277
                'add',
 
278
                r['router']['id'],
 
279
                s['subnet']['id'],
 
280
                None)
 
281
            port_id = body['port_id']
 
282
 
 
283
            self.fake_vendor_ext['if_list']['port']['id'] = port_id
 
284
            self.fake_vendor_ext['if_list']['direction'] = 'inside'
 
285
            self.mock_get_hosting_info.return_value = self.fake_vendor_ext
 
286
 
 
287
            with self.firewall(port_id=body['port_id'],
 
288
                 direction='both') as fw:
 
289
                ctx = context.get_admin_context()
 
290
                fw_id = fw['firewall']['id']
 
291
                csrfw = self.lookup_firewall_csr_association(
 
292
                    ctx, fw_id)
 
293
                status_data = {'acl_id': 100}
 
294
 
 
295
                res = self.callbacks.set_firewall_status(ctx, fw_id,
 
296
                    const.ACTIVE, status_data)
 
297
 
 
298
                # update direction on same port
 
299
                data = {'firewall': {'name': 'firewall_2',
 
300
                    'direction': 'both', 'port_id': port_id}}
 
301
                req = self.new_update_request('firewalls', data,
 
302
                    fw['firewall']['id'])
 
303
                req.environ['neutron.context'] = context.Context(
 
304
                    '', 'test-tenant')
 
305
                res = self.deserialize(self.fmt,
 
306
                req.get_response(self.ext_api))
 
307
 
 
308
                csrfw = self.lookup_firewall_csr_association(ctx,
 
309
                    fw['firewall']['id'])
 
310
 
 
311
                self.assertEqual('firewall_2', res['firewall']['name'])
 
312
                self.assertEqual(port_id, csrfw['port_id'])
 
313
                self.assertEqual('both', csrfw['direction'])
 
314
 
 
315
                # cant be in PENDING_XXX state for delete clean up
 
316
                with ctx.session.begin(subtransactions=True):
 
317
                    fw_db = self.plugin._get_firewall(ctx, fw_id)
 
318
                    fw_db['status'] = const.ACTIVE
 
319
                    ctx.session.flush()
 
320
            self._router_interface_action(
 
321
                'remove',
 
322
                r['router']['id'],
 
323
                s['subnet']['id'],
 
324
                None)
 
325
 
 
326
    def test_update_csr_firewall_port_id(self):
 
327
 
 
328
        with contextlib.nested(
 
329
            self.router(tenant_id=self._tenant_id),
 
330
            self.subnet(),
 
331
            self.subnet(cidr='20.0.0.0/24'),
 
332
        ) as (r, s1, s2):
 
333
 
 
334
            body = self._router_interface_action(
 
335
                'add',
 
336
                r['router']['id'],
 
337
                s1['subnet']['id'],
 
338
                None)
 
339
            port_id1 = body['port_id']
 
340
 
 
341
            body = self._router_interface_action(
 
342
                'add',
 
343
                r['router']['id'],
 
344
                s2['subnet']['id'],
 
345
                None)
 
346
            port_id2 = body['port_id']
 
347
 
 
348
            self.fake_vendor_ext['if_list']['port']['id'] = port_id1
 
349
            self.fake_vendor_ext['if_list']['direction'] = 'inside'
 
350
            self.mock_get_hosting_info.return_value = self.fake_vendor_ext
 
351
 
 
352
            with self.firewall(port_id=port_id1,
 
353
                 direction='both') as fw:
 
354
                ctx = context.get_admin_context()
 
355
                fw_id = fw['firewall']['id']
 
356
                status_data = {'acl_id': 100}
 
357
 
 
358
                res = self.callbacks.set_firewall_status(ctx, fw_id,
 
359
                    const.ACTIVE, status_data)
 
360
 
 
361
                # update direction on same port
 
362
                data = {'firewall': {'name': 'firewall_2',
 
363
                    'direction': 'both', 'port_id': port_id2}}
 
364
                req = self.new_update_request('firewalls', data,
 
365
                    fw['firewall']['id'])
 
366
                req.environ['neutron.context'] = context.Context(
 
367
                    '', 'test-tenant')
 
368
                res = self.deserialize(self.fmt,
 
369
                req.get_response(self.ext_api))
 
370
 
 
371
                csrfw = self.lookup_firewall_csr_association(ctx,
 
372
                    fw['firewall']['id'])
 
373
 
 
374
                self.assertEqual('firewall_2', res['firewall']['name'])
 
375
                self.assertEqual(port_id2, csrfw['port_id'])
 
376
                self.assertEqual('both', csrfw['direction'])
 
377
 
 
378
                # cant be in PENDING_XXX state for delete clean up
 
379
                with ctx.session.begin(subtransactions=True):
 
380
                    fw_db = self.plugin._get_firewall(ctx, fw_id)
 
381
                    fw_db['status'] = const.ACTIVE
 
382
                    ctx.session.flush()
 
383
            self._router_interface_action('remove',
 
384
                r['router']['id'],
 
385
                s1['subnet']['id'],
 
386
                None)
 
387
            self._router_interface_action(
 
388
                'remove',
 
389
                r['router']['id'],
 
390
                s2['subnet']['id'],
 
391
                None)
 
392
 
 
393
    def test_delete_csr_firewall(self):
 
394
 
 
395
        with contextlib.nested(
 
396
            self.router(tenant_id=self._tenant_id),
 
397
            self.subnet(),
 
398
        ) as (r, s):
 
399
            body = self._router_interface_action(
 
400
                'add',
 
401
                r['router']['id'],
 
402
                s['subnet']['id'],
 
403
                None)
 
404
            port_id = body['port_id']
 
405
 
 
406
            self.fake_vendor_ext['if_list']['port']['id'] = port_id
 
407
            self.fake_vendor_ext['if_list']['direction'] = 'inside'
 
408
            self.mock_get_hosting_info.return_value = self.fake_vendor_ext
 
409
 
 
410
            with self.firewall(port_id=port_id,
 
411
                direction='inside', do_delete=False) as fw:
 
412
                fw_id = fw['firewall']['id']
 
413
                ctx = context.get_admin_context()
 
414
                csrfw = self.lookup_firewall_csr_association(ctx,
 
415
                    fw_id)
 
416
                self.assertNotEqual(None, csrfw)
 
417
                req = self.new_delete_request('firewalls', fw_id)
 
418
                req.get_response(self.ext_api)
 
419
                with ctx.session.begin(subtransactions=True):
 
420
                    fw_db = self.plugin._get_firewall(ctx, fw_id)
 
421
                    fw_db['status'] = const.PENDING_DELETE
 
422
                    ctx.session.flush()
 
423
                self.callbacks.firewall_deleted(ctx, fw_id)
 
424
                csrfw = self.lookup_firewall_csr_association(ctx,
 
425
                    fw_id)
 
426
                self.assertEqual(None, csrfw)
 
427
            self._router_interface_action(
 
428
                'remove',
 
429
                r['router']['id'],
 
430
                s['subnet']['id'],
 
431
                None)