~ubuntu-branches/ubuntu/vivid/neutron/vivid-proposed

« back to all changes in this revision

Viewing changes to neutron/tests/unit/plugins/ml2/test_plugin.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2015-04-15 13:59:07 UTC
  • mfrom: (1.1.22)
  • Revision ID: package-import@ubuntu.com-20150415135907-z10fr18evag1ozq3
Tags: 1:2015.1~rc1-0ubuntu1
* New upstream milestone release:
  - debian/control: Update dependencies. 
  - debian/patches/disable-udev-tests.patch: Dropped no longer needed.
  - debian/patches/fixup-driver-test-execution.patch: Dropped no longer needed.
  - debian/patches/skip-iptest.patch: Skip failing test
  - debian/neutron-plugin-openvswitch-agent.install: Added neutron-ovsvapp-agent binary.
  - debian/neutron-plugin-cisco.install: Added neutron-cisco-apic-service-agent and 
    neutron-cisco-apic-host-agent

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2013 OpenStack Foundation
 
2
# All Rights Reserved.
 
3
#
 
4
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 
5
#    not use this file except in compliance with the License. You may obtain
 
6
#    a copy of the License at
 
7
#
 
8
#         http://www.apache.org/licenses/LICENSE-2.0
 
9
#
 
10
#    Unless required by applicable law or agreed to in writing, software
 
11
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
12
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
13
#    License for the specific language governing permissions and limitations
 
14
#    under the License.
 
15
 
 
16
import contextlib
 
17
import functools
 
18
import mock
 
19
import six
 
20
import testtools
 
21
import uuid
 
22
import webob
 
23
 
 
24
import fixtures
 
25
from oslo_db import exception as db_exc
 
26
 
 
27
from neutron.callbacks import registry
 
28
from neutron.common import constants
 
29
from neutron.common import exceptions as exc
 
30
from neutron.common import utils
 
31
from neutron import context
 
32
from neutron.db import api as db_api
 
33
from neutron.db import db_base_plugin_v2 as base_plugin
 
34
from neutron.db import l3_db
 
35
from neutron.extensions import external_net as external_net
 
36
from neutron.extensions import multiprovidernet as mpnet
 
37
from neutron.extensions import portbindings
 
38
from neutron.extensions import providernet as pnet
 
39
from neutron import manager
 
40
from neutron.plugins.common import constants as service_constants
 
41
from neutron.plugins.ml2.common import exceptions as ml2_exc
 
42
from neutron.plugins.ml2 import config
 
43
from neutron.plugins.ml2 import db as ml2_db
 
44
from neutron.plugins.ml2 import driver_api
 
45
from neutron.plugins.ml2 import driver_context
 
46
from neutron.plugins.ml2.drivers import type_vlan
 
47
from neutron.plugins.ml2 import models
 
48
from neutron.plugins.ml2 import plugin as ml2_plugin
 
49
from neutron.tests import base
 
50
from neutron.tests.unit import _test_extension_portbindings as test_bindings
 
51
from neutron.tests.unit.agent import test_securitygroups_rpc as test_sg_rpc
 
52
from neutron.tests.unit.db import test_allowedaddresspairs_db as test_pair
 
53
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
 
54
from neutron.tests.unit.extensions import test_extra_dhcp_opt as test_dhcpopts
 
55
from neutron.tests.unit.plugins.ml2.drivers import mechanism_logger as \
 
56
     mech_logger
 
57
from neutron.tests.unit.plugins.ml2.drivers import mechanism_test as mech_test
 
58
 
 
59
 
 
60
config.cfg.CONF.import_opt('network_vlan_ranges',
 
61
                           'neutron.plugins.ml2.drivers.type_vlan',
 
62
                           group='ml2_type_vlan')
 
63
 
 
64
 
 
65
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
 
66
 
 
67
DEVICE_OWNER_COMPUTE = 'compute:None'
 
68
HOST = 'fake_host'
 
69
 
 
70
 
 
71
# TODO(marun) - Move to somewhere common for reuse
 
72
class PluginConfFixture(fixtures.Fixture):
 
73
    """Plugin configuration shared across the unit and functional tests."""
 
74
 
 
75
    def __init__(self, plugin_name, parent_setup=None):
 
76
        self.plugin_name = plugin_name
 
77
        self.parent_setup = parent_setup
 
78
 
 
79
    def setUp(self):
 
80
        super(PluginConfFixture, self).setUp()
 
81
        if self.parent_setup:
 
82
            self.parent_setup()
 
83
 
 
84
 
 
85
class Ml2ConfFixture(PluginConfFixture):
 
86
 
 
87
    def __init__(self, parent_setup=None):
 
88
        super(Ml2ConfFixture, self).__init__(PLUGIN_NAME, parent_setup)
 
89
 
 
90
 
 
91
class Ml2PluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
 
92
 
 
93
    _mechanism_drivers = ['logger', 'test']
 
94
 
 
95
    def setup_parent(self):
 
96
        """Perform parent setup with the common plugin configuration class."""
 
97
        l3_plugin = ('neutron.tests.unit.extensions.test_l3.'
 
98
                     'TestL3NatServicePlugin')
 
99
        service_plugins = {'l3_plugin_name': l3_plugin}
 
100
        # Ensure that the parent setup can be called without arguments
 
101
        # by the common configuration setUp.
 
102
        parent_setup = functools.partial(
 
103
            super(Ml2PluginV2TestCase, self).setUp,
 
104
            plugin=PLUGIN_NAME,
 
105
            service_plugins=service_plugins,
 
106
        )
 
107
        self.useFixture(Ml2ConfFixture(parent_setup))
 
108
        self.port_create_status = 'DOWN'
 
109
 
 
110
    def setUp(self):
 
111
        # Enable the test mechanism driver to ensure that
 
112
        # we can successfully call through to all mechanism
 
113
        # driver apis.
 
114
        config.cfg.CONF.set_override('mechanism_drivers',
 
115
                                     self._mechanism_drivers,
 
116
                                     group='ml2')
 
117
        self.physnet = 'physnet1'
 
118
        self.vlan_range = '1:100'
 
119
        self.vlan_range2 = '200:300'
 
120
        self.physnet2 = 'physnet2'
 
121
        self.phys_vrange = ':'.join([self.physnet, self.vlan_range])
 
122
        self.phys2_vrange = ':'.join([self.physnet2, self.vlan_range2])
 
123
        config.cfg.CONF.set_override('network_vlan_ranges',
 
124
                                     [self.phys_vrange, self.phys2_vrange],
 
125
                                     group='ml2_type_vlan')
 
126
        self.setup_parent()
 
127
        self.driver = ml2_plugin.Ml2Plugin()
 
128
        self.context = context.get_admin_context()
 
129
 
 
130
 
 
131
class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
 
132
 
 
133
    _mechanism_drivers = ['logger', 'test']
 
134
 
 
135
    def test_bulk_enabled_with_bulk_drivers(self):
 
136
        self.assertFalse(self._skip_native_bulk)
 
137
 
 
138
 
 
139
class TestMl2BasicGet(test_plugin.TestBasicGet,
 
140
                      Ml2PluginV2TestCase):
 
141
    pass
 
142
 
 
143
 
 
144
class TestMl2V2HTTPResponse(test_plugin.TestV2HTTPResponse,
 
145
                            Ml2PluginV2TestCase):
 
146
    pass
 
147
 
 
148
 
 
149
class TestMl2NetworksV2(test_plugin.TestNetworksV2,
 
150
                        Ml2PluginV2TestCase):
 
151
    def setUp(self, plugin=None):
 
152
        super(TestMl2NetworksV2, self).setUp()
 
153
        # provider networks
 
154
        self.pnets = [{'name': 'net1',
 
155
                       pnet.NETWORK_TYPE: 'vlan',
 
156
                       pnet.PHYSICAL_NETWORK: 'physnet1',
 
157
                       pnet.SEGMENTATION_ID: 1,
 
158
                       'tenant_id': 'tenant_one'},
 
159
                      {'name': 'net2',
 
160
                       pnet.NETWORK_TYPE: 'vlan',
 
161
                       pnet.PHYSICAL_NETWORK: 'physnet2',
 
162
                       pnet.SEGMENTATION_ID: 210,
 
163
                       'tenant_id': 'tenant_one'},
 
164
                      {'name': 'net3',
 
165
                       pnet.NETWORK_TYPE: 'vlan',
 
166
                       pnet.PHYSICAL_NETWORK: 'physnet2',
 
167
                       pnet.SEGMENTATION_ID: 220,
 
168
                       'tenant_id': 'tenant_one'}
 
169
                      ]
 
170
        # multiprovider networks
 
171
        self.mp_nets = [{'name': 'net4',
 
172
                         mpnet.SEGMENTS:
 
173
                             [{pnet.NETWORK_TYPE: 'vlan',
 
174
                               pnet.PHYSICAL_NETWORK: 'physnet2',
 
175
                               pnet.SEGMENTATION_ID: 1},
 
176
                              {pnet.NETWORK_TYPE: 'vlan',
 
177
                               pnet.PHYSICAL_NETWORK: 'physnet2',
 
178
                               pnet.SEGMENTATION_ID: 202}],
 
179
                         'tenant_id': 'tenant_one'}
 
180
                        ]
 
181
        self.nets = self.mp_nets + self.pnets
 
182
 
 
183
    def test_port_delete_helper_tolerates_failure(self):
 
184
        plugin = manager.NeutronManager.get_plugin()
 
185
        with mock.patch.object(plugin, "delete_port",
 
186
                               side_effect=exc.PortNotFound(port_id="123")):
 
187
            plugin._delete_ports(None, [mock.MagicMock()])
 
188
 
 
189
    def test_subnet_delete_helper_tolerates_failure(self):
 
190
        plugin = manager.NeutronManager.get_plugin()
 
191
        with mock.patch.object(plugin, "delete_subnet",
 
192
                               side_effect=exc.SubnetNotFound(subnet_id="1")):
 
193
            plugin._delete_subnets(None, [mock.MagicMock()])
 
194
 
 
195
    def _create_and_verify_networks(self, networks):
 
196
        for net_idx, net in enumerate(networks):
 
197
            # create
 
198
            req = self.new_create_request('networks',
 
199
                                          {'network': net})
 
200
            # verify
 
201
            network = self.deserialize(self.fmt,
 
202
                                       req.get_response(self.api))['network']
 
203
            if mpnet.SEGMENTS not in net:
 
204
                for k, v in six.iteritems(net):
 
205
                    self.assertEqual(net[k], network[k])
 
206
                    self.assertNotIn(mpnet.SEGMENTS, network)
 
207
            else:
 
208
                segments = network[mpnet.SEGMENTS]
 
209
                expected_segments = net[mpnet.SEGMENTS]
 
210
                self.assertEqual(len(expected_segments), len(segments))
 
211
                for expected, actual in zip(expected_segments, segments):
 
212
                    self.assertEqual(expected, actual)
 
213
 
 
214
    def _lookup_network_by_segmentation_id(self, seg_id, num_expected_nets):
 
215
        params_str = "%s=%s" % (pnet.SEGMENTATION_ID, seg_id)
 
216
        net_req = self.new_list_request('networks', None,
 
217
                                        params=params_str)
 
218
        networks = self.deserialize(self.fmt, net_req.get_response(self.api))
 
219
        if num_expected_nets:
 
220
            self.assertIsNotNone(networks)
 
221
            self.assertEqual(num_expected_nets, len(networks['networks']))
 
222
        else:
 
223
            self.assertIsNone(networks)
 
224
        return networks
 
225
 
 
226
    def test_list_networks_with_segmentation_id(self):
 
227
        self._create_and_verify_networks(self.pnets)
 
228
        # verify we can find the network that we expect
 
229
        lookup_vlan_id = 1
 
230
        expected_net = [n for n in self.pnets
 
231
                        if n[pnet.SEGMENTATION_ID] == lookup_vlan_id].pop()
 
232
        networks = self._lookup_network_by_segmentation_id(lookup_vlan_id, 1)
 
233
        # verify all provider attributes
 
234
        network = networks['networks'][0]
 
235
        for attr in pnet.ATTRIBUTES:
 
236
            self.assertEqual(expected_net[attr], network[attr])
 
237
 
 
238
    def test_list_mpnetworks_with_segmentation_id(self):
 
239
        self._create_and_verify_networks(self.nets)
 
240
 
 
241
        # get all networks with seg_id=1 (including multisegment networks)
 
242
        lookup_vlan_id = 1
 
243
        networks = self._lookup_network_by_segmentation_id(lookup_vlan_id, 2)
 
244
 
 
245
        # get the mpnet
 
246
        networks = [n for n in networks['networks'] if mpnet.SEGMENTS in n]
 
247
        network = networks.pop()
 
248
        # verify attributes of the looked up item
 
249
        segments = network[mpnet.SEGMENTS]
 
250
        expected_segments = self.mp_nets[0][mpnet.SEGMENTS]
 
251
        self.assertEqual(len(expected_segments), len(segments))
 
252
        for expected, actual in zip(expected_segments, segments):
 
253
            self.assertEqual(expected, actual)
 
254
 
 
255
    def test_create_network_segment_allocation_fails(self):
 
256
        plugin = manager.NeutronManager.get_plugin()
 
257
        with mock.patch.object(plugin.type_manager, 'create_network_segments',
 
258
            side_effect=db_exc.RetryRequest(ValueError())) as f:
 
259
            self.assertRaises(ValueError,
 
260
                              plugin.create_network,
 
261
                              context.get_admin_context(),
 
262
                              {'network': {'tenant_id': 'sometenant',
 
263
                                           'name': 'dummy',
 
264
                                           'admin_state_up': True,
 
265
                                           'shared': False}})
 
266
            self.assertEqual(db_api.MAX_RETRIES + 1, f.call_count)
 
267
 
 
268
 
 
269
class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
 
270
                       Ml2PluginV2TestCase):
 
271
    pass
 
272
 
 
273
 
 
274
class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
 
275
 
 
276
    def test_update_port_status_build(self):
 
277
        with self.port() as port:
 
278
            self.assertEqual('DOWN', port['port']['status'])
 
279
            self.assertEqual('DOWN', self.port_create_status)
 
280
 
 
281
    def test_update_port_status_short_id(self):
 
282
        ctx = context.get_admin_context()
 
283
        plugin = manager.NeutronManager.get_plugin()
 
284
        with self.port() as port:
 
285
            with mock.patch.object(ml2_db, 'get_binding_levels',
 
286
                                   return_value=[]) as mock_gbl:
 
287
                port_id = port['port']['id']
 
288
                short_id = port_id[:11]
 
289
                plugin.update_port_status(ctx, short_id, 'UP')
 
290
                mock_gbl.assert_called_once_with(mock.ANY, port_id, mock.ANY)
 
291
 
 
292
    def test_update_port_mac(self):
 
293
        self.check_update_port_mac(
 
294
            host_arg={portbindings.HOST_ID: HOST},
 
295
            arg_list=(portbindings.HOST_ID,))
 
296
 
 
297
    def test_update_non_existent_port(self):
 
298
        ctx = context.get_admin_context()
 
299
        plugin = manager.NeutronManager.get_plugin()
 
300
        data = {'port': {'admin_state_up': False}}
 
301
        self.assertRaises(exc.PortNotFound, plugin.update_port, ctx,
 
302
                          'invalid-uuid', data)
 
303
 
 
304
    def test_delete_non_existent_port(self):
 
305
        ctx = context.get_admin_context()
 
306
        plugin = manager.NeutronManager.get_plugin()
 
307
        with mock.patch.object(ml2_plugin.LOG, 'debug') as log_debug:
 
308
            plugin.delete_port(ctx, 'invalid-uuid', l3_port_check=False)
 
309
            log_debug.assert_has_calls([
 
310
                mock.call(_("Deleting port %s"), 'invalid-uuid'),
 
311
                mock.call(_("The port '%s' was deleted"), 'invalid-uuid')
 
312
            ])
 
313
 
 
314
    def test_l3_cleanup_on_net_delete(self):
 
315
        l3plugin = manager.NeutronManager.get_service_plugins().get(
 
316
            service_constants.L3_ROUTER_NAT)
 
317
        kwargs = {'arg_list': (external_net.EXTERNAL,),
 
318
                  external_net.EXTERNAL: True}
 
319
        with self.network(**kwargs) as n:
 
320
            with self.subnet(network=n, cidr='200.0.0.0/22'):
 
321
                l3plugin.create_floatingip(
 
322
                    context.get_admin_context(),
 
323
                    {'floatingip': {'floating_network_id': n['network']['id'],
 
324
                                    'tenant_id': n['network']['tenant_id']}}
 
325
                )
 
326
        self._delete('networks', n['network']['id'])
 
327
        flips = l3plugin.get_floatingips(context.get_admin_context())
 
328
        self.assertFalse(flips)
 
329
 
 
330
    def test_create_ports_bulk_port_binding_failure(self):
 
331
        ctx = context.get_admin_context()
 
332
        with self.network() as net:
 
333
            plugin = manager.NeutronManager.get_plugin()
 
334
 
 
335
            with mock.patch.object(plugin, '_bind_port_if_needed',
 
336
                side_effect=ml2_exc.MechanismDriverError(
 
337
                    method='create_port_bulk')) as _bind_port_if_needed:
 
338
 
 
339
                res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
 
340
                                             'test', True, context=ctx)
 
341
 
 
342
                self.assertTrue(_bind_port_if_needed.called)
 
343
                # We expect a 500 as we injected a fault in the plugin
 
344
                self._validate_behavior_on_bulk_failure(
 
345
                    res, 'ports', webob.exc.HTTPServerError.code)
 
346
 
 
347
    def test_create_ports_bulk_with_sec_grp(self):
 
348
        ctx = context.get_admin_context()
 
349
        plugin = manager.NeutronManager.get_plugin()
 
350
        with contextlib.nested(
 
351
            self.network(),
 
352
            mock.patch.object(plugin.notifier,
 
353
                              'security_groups_member_updated'),
 
354
            mock.patch.object(plugin.notifier,
 
355
                              'security_groups_provider_updated')
 
356
        ) as (net, m_upd, p_upd):
 
357
 
 
358
            res = self._create_port_bulk(self.fmt, 3, net['network']['id'],
 
359
                                         'test', True, context=ctx)
 
360
            ports = self.deserialize(self.fmt, res)
 
361
            used_sg = ports['ports'][0]['security_groups']
 
362
            m_upd.assert_called_once_with(ctx, used_sg)
 
363
            self.assertFalse(p_upd.called)
 
364
 
 
365
    def test_create_ports_bulk_with_sec_grp_member_provider_update(self):
 
366
        ctx = context.get_admin_context()
 
367
        plugin = manager.NeutronManager.get_plugin()
 
368
        with contextlib.nested(
 
369
            self.network(),
 
370
            mock.patch.object(plugin.notifier,
 
371
                              'security_groups_member_updated'),
 
372
            mock.patch.object(plugin.notifier,
 
373
                              'security_groups_provider_updated')
 
374
        ) as (net, m_upd, p_upd):
 
375
 
 
376
            net_id = net['network']['id']
 
377
            data = [{
 
378
                    'network_id': net_id,
 
379
                    'tenant_id': self._tenant_id
 
380
                    },
 
381
                    {
 
382
                    'network_id': net_id,
 
383
                    'tenant_id': self._tenant_id,
 
384
                    'device_owner': constants.DEVICE_OWNER_DHCP
 
385
                    }
 
386
                    ]
 
387
 
 
388
            res = self._create_bulk_from_list(self.fmt, 'port',
 
389
                                              data, context=ctx)
 
390
            ports = self.deserialize(self.fmt, res)
 
391
            used_sg = ports['ports'][0]['security_groups']
 
392
            m_upd.assert_called_once_with(ctx, used_sg)
 
393
            p_upd.assert_called_once_with(ctx)
 
394
 
 
395
            m_upd.reset_mock()
 
396
            p_upd.reset_mock()
 
397
            data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP
 
398
            self._create_bulk_from_list(self.fmt, 'port',
 
399
                                        data, context=ctx)
 
400
            self.assertFalse(m_upd.called)
 
401
            p_upd.assert_called_once_with(ctx)
 
402
 
 
403
    def test_create_ports_bulk_with_sec_grp_provider_update_ipv6(self):
 
404
        ctx = context.get_admin_context()
 
405
        plugin = manager.NeutronManager.get_plugin()
 
406
        fake_prefix = '2001:db8::/64'
 
407
        fake_gateway = 'fe80::1'
 
408
        with self.network() as net:
 
409
            with contextlib.nested(
 
410
                self.subnet(net, gateway_ip=fake_gateway,
 
411
                            cidr=fake_prefix, ip_version=6),
 
412
                mock.patch.object(
 
413
                    plugin.notifier, 'security_groups_member_updated'),
 
414
                mock.patch.object(
 
415
                    plugin.notifier, 'security_groups_provider_updated')
 
416
            ) as (snet_v6, m_upd, p_upd):
 
417
 
 
418
                net_id = net['network']['id']
 
419
                data = [{
 
420
                        'network_id': net_id,
 
421
                        'tenant_id': self._tenant_id,
 
422
                        'fixed_ips': [{'subnet_id': snet_v6['subnet']['id']}],
 
423
                        'device_owner': constants.DEVICE_OWNER_ROUTER_INTF
 
424
                        }
 
425
                        ]
 
426
                self._create_bulk_from_list(self.fmt, 'port',
 
427
                                            data, context=ctx)
 
428
                self.assertFalse(m_upd.called)
 
429
                p_upd.assert_called_once_with(ctx)
 
430
 
 
431
    def test_delete_port_no_notify_in_disassociate_floatingips(self):
 
432
        ctx = context.get_admin_context()
 
433
        plugin = manager.NeutronManager.get_plugin()
 
434
        l3plugin = manager.NeutronManager.get_service_plugins().get(
 
435
            service_constants.L3_ROUTER_NAT)
 
436
        with contextlib.nested(
 
437
            self.port(),
 
438
            mock.patch.object(l3plugin, 'disassociate_floatingips'),
 
439
            mock.patch.object(registry, 'notify')
 
440
        ) as (port, disassociate_floatingips, notify):
 
441
 
 
442
            port_id = port['port']['id']
 
443
            plugin.delete_port(ctx, port_id)
 
444
 
 
445
            # check that no notification was requested while under
 
446
            # transaction
 
447
            disassociate_floatingips.assert_has_calls([
 
448
                mock.call(ctx, port_id, do_notify=False)
 
449
            ])
 
450
 
 
451
            # check that notifier was still triggered
 
452
            self.assertTrue(notify.call_counts)
 
453
 
 
454
    def test_check_if_compute_port_serviced_by_dvr(self):
 
455
        self.assertTrue(utils.is_dvr_serviced('compute:None'))
 
456
 
 
457
    def test_check_if_lbaas_vip_port_serviced_by_dvr(self):
 
458
        self.assertTrue(utils.is_dvr_serviced(
 
459
            constants.DEVICE_OWNER_LOADBALANCER))
 
460
 
 
461
    def test_check_if_dhcp_port_serviced_by_dvr(self):
 
462
        self.assertTrue(utils.is_dvr_serviced(constants.DEVICE_OWNER_DHCP))
 
463
 
 
464
    def test_check_if_port_not_serviced_by_dvr(self):
 
465
        self.assertFalse(utils.is_dvr_serviced(
 
466
            constants.DEVICE_OWNER_ROUTER_INTF))
 
467
 
 
468
    def test_disassociate_floatingips_do_notify_returns_nothing(self):
 
469
        ctx = context.get_admin_context()
 
470
        l3plugin = manager.NeutronManager.get_service_plugins().get(
 
471
            service_constants.L3_ROUTER_NAT)
 
472
        with self.port() as port:
 
473
 
 
474
            port_id = port['port']['id']
 
475
            # check that nothing is returned when notifications are handled
 
476
            # by the called method
 
477
            self.assertIsNone(l3plugin.disassociate_floatingips(ctx, port_id))
 
478
 
 
479
 
 
480
class TestMl2PluginOnly(Ml2PluginV2TestCase):
 
481
    """For testing methods that don't call drivers"""
 
482
 
 
483
    def _test_check_mac_update_allowed(self, vif_type, expect_change=True):
 
484
        plugin = manager.NeutronManager.get_plugin()
 
485
        port = {'mac_address': "fake_mac", 'id': "fake_id"}
 
486
        if expect_change:
 
487
            new_attrs = {"mac_address": "dummy_mac"}
 
488
        else:
 
489
            new_attrs = {"mac_address": port['mac_address']}
 
490
        binding = mock.Mock()
 
491
        binding.vif_type = vif_type
 
492
        mac_changed = plugin._check_mac_update_allowed(port, new_attrs,
 
493
                                                       binding)
 
494
        self.assertEqual(expect_change, mac_changed)
 
495
 
 
496
    def test_check_mac_update_allowed_if_no_mac_change(self):
 
497
        self._test_check_mac_update_allowed(portbindings.VIF_TYPE_UNBOUND,
 
498
                                            expect_change=False)
 
499
 
 
500
    def test_check_mac_update_allowed_unless_bound(self):
 
501
        with testtools.ExpectedException(exc.PortBound):
 
502
            self._test_check_mac_update_allowed(portbindings.VIF_TYPE_OVS)
 
503
 
 
504
 
 
505
class TestMl2DvrPortsV2(TestMl2PortsV2):
 
506
    def setUp(self):
 
507
        super(TestMl2DvrPortsV2, self).setUp()
 
508
        extensions = ['router',
 
509
                      constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
 
510
                      constants.L3_DISTRIBUTED_EXT_ALIAS]
 
511
        self.plugin = manager.NeutronManager.get_plugin()
 
512
        self.l3plugin = mock.Mock()
 
513
        type(self.l3plugin).supported_extension_aliases = (
 
514
            mock.PropertyMock(return_value=extensions))
 
515
        self.service_plugins = {'L3_ROUTER_NAT': self.l3plugin}
 
516
 
 
517
    def _test_delete_dvr_serviced_port(self, device_owner, floating_ip=False):
 
518
        ns_to_delete = {'host': 'myhost', 'agent_id': 'vm_l3_agent',
 
519
                        'router_id': 'my_router'}
 
520
        fip_set = set()
 
521
        if floating_ip:
 
522
            fip_set.add(ns_to_delete['router_id'])
 
523
 
 
524
        with contextlib.nested(
 
525
            mock.patch.object(manager.NeutronManager,
 
526
                              'get_service_plugins',
 
527
                              return_value=self.service_plugins),
 
528
            self.port(device_owner=device_owner),
 
529
            mock.patch.object(registry, 'notify'),
 
530
            mock.patch.object(self.l3plugin, 'disassociate_floatingips',
 
531
                              return_value=fip_set),
 
532
            mock.patch.object(self.l3plugin, 'dvr_deletens_if_no_port',
 
533
                              return_value=[ns_to_delete]),
 
534
        ) as (get_service_plugin, port, notify, disassociate_floatingips,
 
535
              dvr_delns_ifno_port):
 
536
 
 
537
            port_id = port['port']['id']
 
538
            self.plugin.delete_port(self.context, port_id)
 
539
 
 
540
            self.assertTrue(notify.call_count)
 
541
            dvr_delns_ifno_port.assert_called_once_with(self.context,
 
542
                                                        port['port']['id'])
 
543
 
 
544
    def test_delete_last_vm_port(self):
 
545
        self._test_delete_dvr_serviced_port(device_owner='compute:None')
 
546
 
 
547
    def test_delete_last_vm_port_with_floatingip(self):
 
548
        self._test_delete_dvr_serviced_port(device_owner='compute:None',
 
549
                                            floating_ip=True)
 
550
 
 
551
    def test_delete_lbaas_vip_port(self):
 
552
        self._test_delete_dvr_serviced_port(
 
553
            device_owner=constants.DEVICE_OWNER_LOADBALANCER)
 
554
 
 
555
    def test_concurrent_csnat_port_delete(self):
 
556
        plugin = manager.NeutronManager.get_service_plugins()[
 
557
            service_constants.L3_ROUTER_NAT]
 
558
        r = plugin.create_router(
 
559
            self.context,
 
560
            {'router': {'name': 'router', 'admin_state_up': True}})
 
561
        with self.subnet() as s:
 
562
            p = plugin.add_router_interface(self.context, r['id'],
 
563
                                            {'subnet_id': s['subnet']['id']})
 
564
 
 
565
        # lie to turn the port into an SNAT interface
 
566
        with self.context.session.begin():
 
567
            rp = self.context.session.query(l3_db.RouterPort).filter_by(
 
568
                port_id=p['port_id']).first()
 
569
            rp.port_type = constants.DEVICE_OWNER_ROUTER_SNAT
 
570
 
 
571
        # take the port away before csnat gets a chance to delete it
 
572
        # to simulate a concurrent delete
 
573
        orig_get_ports = plugin._core_plugin.get_ports
 
574
 
 
575
        def get_ports_with_delete_first(*args, **kwargs):
 
576
            plugin._core_plugin.delete_port(self.context,
 
577
                                            p['port_id'],
 
578
                                            l3_port_check=False)
 
579
            return orig_get_ports(*args, **kwargs)
 
580
        plugin._core_plugin.get_ports = get_ports_with_delete_first
 
581
 
 
582
        # This should be able to handle a concurrent delete without raising
 
583
        # an exception
 
584
        router = plugin._get_router(self.context, r['id'])
 
585
        plugin.delete_csnat_router_interface_ports(self.context, router)
 
586
 
 
587
 
 
588
class TestMl2PortBinding(Ml2PluginV2TestCase,
 
589
                         test_bindings.PortBindingsTestCase):
 
590
    # Test case does not set binding:host_id, so ml2 does not attempt
 
591
    # to bind port
 
592
    VIF_TYPE = portbindings.VIF_TYPE_UNBOUND
 
593
    HAS_PORT_FILTER = False
 
594
    ENABLE_SG = True
 
595
    FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
 
596
 
 
597
    def setUp(self, firewall_driver=None):
 
598
        test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
 
599
        config.cfg.CONF.set_override(
 
600
            'enable_security_group', self.ENABLE_SG,
 
601
            group='SECURITYGROUP')
 
602
        super(TestMl2PortBinding, self).setUp()
 
603
 
 
604
    def _check_port_binding_profile(self, port, profile=None):
 
605
        self.assertIn('id', port)
 
606
        self.assertIn(portbindings.PROFILE, port)
 
607
        value = port[portbindings.PROFILE]
 
608
        self.assertEqual(profile or {}, value)
 
609
 
 
610
    def test_create_port_binding_profile(self):
 
611
        self._test_create_port_binding_profile({'a': 1, 'b': 2})
 
612
 
 
613
    def test_update_port_binding_profile(self):
 
614
        self._test_update_port_binding_profile({'c': 3})
 
615
 
 
616
    def test_create_port_binding_profile_too_big(self):
 
617
        s = 'x' * 5000
 
618
        profile_arg = {portbindings.PROFILE: {'d': s}}
 
619
        try:
 
620
            with self.port(expected_res_status=400,
 
621
                           arg_list=(portbindings.PROFILE,),
 
622
                           **profile_arg):
 
623
                pass
 
624
        except webob.exc.HTTPClientError:
 
625
            pass
 
626
 
 
627
    def test_remove_port_binding_profile(self):
 
628
        profile = {'e': 5}
 
629
        profile_arg = {portbindings.PROFILE: profile}
 
630
        with self.port(arg_list=(portbindings.PROFILE,),
 
631
                       **profile_arg) as port:
 
632
            self._check_port_binding_profile(port['port'], profile)
 
633
            port_id = port['port']['id']
 
634
            profile_arg = {portbindings.PROFILE: None}
 
635
            port = self._update('ports', port_id,
 
636
                                {'port': profile_arg})['port']
 
637
            self._check_port_binding_profile(port)
 
638
            port = self._show('ports', port_id)['port']
 
639
            self._check_port_binding_profile(port)
 
640
 
 
641
    def test_return_on_concurrent_delete_and_binding(self):
 
642
        # create a port and delete it so we have an expired mechanism context
 
643
        with self.port() as port:
 
644
            plugin = manager.NeutronManager.get_plugin()
 
645
            binding = ml2_db.get_locked_port_and_binding(self.context.session,
 
646
                                                         port['port']['id'])[1]
 
647
            binding['host'] = 'test'
 
648
            mech_context = driver_context.PortContext(
 
649
                plugin, self.context, port['port'],
 
650
                plugin.get_network(self.context, port['port']['network_id']),
 
651
                binding, None)
 
652
        with contextlib.nested(
 
653
            mock.patch('neutron.plugins.ml2.plugin.'
 
654
                       'db.get_locked_port_and_binding',
 
655
                       return_value=(None, None)),
 
656
            mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin._make_port_dict')
 
657
        ) as (glpab_mock, mpd_mock):
 
658
            plugin._bind_port_if_needed(mech_context)
 
659
            # called during deletion to get port
 
660
            self.assertTrue(glpab_mock.mock_calls)
 
661
            # should have returned before calling _make_port_dict
 
662
            self.assertFalse(mpd_mock.mock_calls)
 
663
 
 
664
    def test_port_binding_profile_not_changed(self):
 
665
        profile = {'e': 5}
 
666
        profile_arg = {portbindings.PROFILE: profile}
 
667
        with self.port(arg_list=(portbindings.PROFILE,),
 
668
                       **profile_arg) as port:
 
669
            self._check_port_binding_profile(port['port'], profile)
 
670
            port_id = port['port']['id']
 
671
            state_arg = {'admin_state_up': True}
 
672
            port = self._update('ports', port_id,
 
673
                                {'port': state_arg})['port']
 
674
            self._check_port_binding_profile(port, profile)
 
675
            port = self._show('ports', port_id)['port']
 
676
            self._check_port_binding_profile(port, profile)
 
677
 
 
678
    def test_process_dvr_port_binding_update_router_id(self):
 
679
        host_id = 'host'
 
680
        binding = models.DVRPortBinding(
 
681
                            port_id='port_id',
 
682
                            host=host_id,
 
683
                            router_id='old_router_id',
 
684
                            vif_type=portbindings.VIF_TYPE_OVS,
 
685
                            vnic_type=portbindings.VNIC_NORMAL,
 
686
                            status=constants.PORT_STATUS_DOWN)
 
687
        plugin = manager.NeutronManager.get_plugin()
 
688
        mock_network = {'id': 'net_id'}
 
689
        mock_port = {'id': 'port_id'}
 
690
        context = mock.Mock()
 
691
        new_router_id = 'new_router'
 
692
        attrs = {'device_id': new_router_id, portbindings.HOST_ID: host_id}
 
693
        with mock.patch.object(plugin, '_update_port_dict_binding'):
 
694
            with mock.patch.object(ml2_db, 'get_network_segments',
 
695
                                   return_value=[]):
 
696
                mech_context = driver_context.PortContext(
 
697
                    self, context, mock_port, mock_network, binding, None)
 
698
                plugin._process_dvr_port_binding(mech_context, context, attrs)
 
699
                self.assertEqual(new_router_id,
 
700
                                 mech_context._binding.router_id)
 
701
                self.assertEqual(host_id, mech_context._binding.host)
 
702
 
 
703
    def test_update_dvr_port_binding_on_non_existent_port(self):
 
704
        plugin = manager.NeutronManager.get_plugin()
 
705
        port = {
 
706
            'id': 'foo_port_id',
 
707
            'binding:host_id': 'foo_host',
 
708
        }
 
709
        with mock.patch.object(ml2_db, 'ensure_dvr_port_binding') as mock_dvr:
 
710
            plugin.update_dvr_port_binding(
 
711
                self.context, 'foo_port_id', {'port': port})
 
712
        self.assertFalse(mock_dvr.called)
 
713
 
 
714
 
 
715
class TestMl2PortBindingNoSG(TestMl2PortBinding):
 
716
    HAS_PORT_FILTER = False
 
717
    ENABLE_SG = False
 
718
    FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
 
719
 
 
720
 
 
721
class TestMl2PortBindingHost(Ml2PluginV2TestCase,
 
722
                             test_bindings.PortBindingsHostTestCaseMixin):
 
723
    pass
 
724
 
 
725
 
 
726
class TestMl2PortBindingVnicType(Ml2PluginV2TestCase,
 
727
                                 test_bindings.PortBindingsVnicTestCaseMixin):
 
728
    pass
 
729
 
 
730
 
 
731
class TestMultiSegmentNetworks(Ml2PluginV2TestCase):
 
732
 
 
733
    def setUp(self, plugin=None):
 
734
        super(TestMultiSegmentNetworks, self).setUp()
 
735
 
 
736
    def test_allocate_dynamic_segment(self):
 
737
        data = {'network': {'name': 'net1',
 
738
                            'tenant_id': 'tenant_one'}}
 
739
        network_req = self.new_create_request('networks', data)
 
740
        network = self.deserialize(self.fmt,
 
741
                                   network_req.get_response(self.api))
 
742
        segment = {driver_api.NETWORK_TYPE: 'vlan',
 
743
                   driver_api.PHYSICAL_NETWORK: 'physnet1'}
 
744
        network_id = network['network']['id']
 
745
        self.driver.type_manager.allocate_dynamic_segment(
 
746
            self.context.session, network_id, segment)
 
747
        dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
 
748
                                                     network_id,
 
749
                                                     'physnet1')
 
750
        self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
 
751
        self.assertEqual('physnet1',
 
752
                         dynamic_segment[driver_api.PHYSICAL_NETWORK])
 
753
        self.assertTrue(dynamic_segment[driver_api.SEGMENTATION_ID] > 0)
 
754
        segment2 = {driver_api.NETWORK_TYPE: 'vlan',
 
755
                    driver_api.SEGMENTATION_ID: 1234,
 
756
                    driver_api.PHYSICAL_NETWORK: 'physnet3'}
 
757
        self.driver.type_manager.allocate_dynamic_segment(
 
758
            self.context.session, network_id, segment2)
 
759
        dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
 
760
                                                     network_id,
 
761
                                                     segmentation_id='1234')
 
762
        self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
 
763
        self.assertEqual('physnet3',
 
764
                         dynamic_segment[driver_api.PHYSICAL_NETWORK])
 
765
        self.assertEqual(dynamic_segment[driver_api.SEGMENTATION_ID], 1234)
 
766
 
 
767
    def test_allocate_dynamic_segment_multiple_physnets(self):
 
768
        data = {'network': {'name': 'net1',
 
769
                            'tenant_id': 'tenant_one'}}
 
770
        network_req = self.new_create_request('networks', data)
 
771
        network = self.deserialize(self.fmt,
 
772
                                   network_req.get_response(self.api))
 
773
        segment = {driver_api.NETWORK_TYPE: 'vlan',
 
774
                   driver_api.PHYSICAL_NETWORK: 'physnet1'}
 
775
        network_id = network['network']['id']
 
776
        self.driver.type_manager.allocate_dynamic_segment(
 
777
            self.context.session, network_id, segment)
 
778
        dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
 
779
                                                     network_id,
 
780
                                                     'physnet1')
 
781
        self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
 
782
        self.assertEqual('physnet1',
 
783
                         dynamic_segment[driver_api.PHYSICAL_NETWORK])
 
784
        dynamic_segmentation_id = dynamic_segment[driver_api.SEGMENTATION_ID]
 
785
        self.assertTrue(dynamic_segmentation_id > 0)
 
786
        dynamic_segment1 = ml2_db.get_dynamic_segment(self.context.session,
 
787
                                                      network_id,
 
788
                                                      'physnet1')
 
789
        dynamic_segment1_id = dynamic_segment1[driver_api.SEGMENTATION_ID]
 
790
        self.assertEqual(dynamic_segmentation_id, dynamic_segment1_id)
 
791
        segment2 = {driver_api.NETWORK_TYPE: 'vlan',
 
792
                    driver_api.PHYSICAL_NETWORK: 'physnet2'}
 
793
        self.driver.type_manager.allocate_dynamic_segment(
 
794
            self.context.session, network_id, segment2)
 
795
        dynamic_segment2 = ml2_db.get_dynamic_segment(self.context.session,
 
796
                                                      network_id,
 
797
                                                      'physnet2')
 
798
        dynamic_segmentation2_id = dynamic_segment2[driver_api.SEGMENTATION_ID]
 
799
        self.assertNotEqual(dynamic_segmentation_id, dynamic_segmentation2_id)
 
800
 
 
801
    def test_allocate_release_dynamic_segment(self):
 
802
        data = {'network': {'name': 'net1',
 
803
                            'tenant_id': 'tenant_one'}}
 
804
        network_req = self.new_create_request('networks', data)
 
805
        network = self.deserialize(self.fmt,
 
806
                                   network_req.get_response(self.api))
 
807
        segment = {driver_api.NETWORK_TYPE: 'vlan',
 
808
                   driver_api.PHYSICAL_NETWORK: 'physnet1'}
 
809
        network_id = network['network']['id']
 
810
        self.driver.type_manager.allocate_dynamic_segment(
 
811
            self.context.session, network_id, segment)
 
812
        dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
 
813
                                                     network_id,
 
814
                                                     'physnet1')
 
815
        self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
 
816
        self.assertEqual('physnet1',
 
817
                         dynamic_segment[driver_api.PHYSICAL_NETWORK])
 
818
        dynamic_segmentation_id = dynamic_segment[driver_api.SEGMENTATION_ID]
 
819
        self.assertTrue(dynamic_segmentation_id > 0)
 
820
        self.driver.type_manager.release_dynamic_segment(
 
821
            self.context.session, dynamic_segment[driver_api.ID])
 
822
        self.assertIsNone(ml2_db.get_dynamic_segment(
 
823
            self.context.session, network_id, 'physnet1'))
 
824
 
 
825
    def test_create_network_provider(self):
 
826
        data = {'network': {'name': 'net1',
 
827
                            pnet.NETWORK_TYPE: 'vlan',
 
828
                            pnet.PHYSICAL_NETWORK: 'physnet1',
 
829
                            pnet.SEGMENTATION_ID: 1,
 
830
                            'tenant_id': 'tenant_one'}}
 
831
        network_req = self.new_create_request('networks', data)
 
832
        network = self.deserialize(self.fmt,
 
833
                                   network_req.get_response(self.api))
 
834
        self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
 
835
        self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
 
836
        self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
 
837
        self.assertNotIn(mpnet.SEGMENTS, network['network'])
 
838
 
 
839
    def test_create_network_single_multiprovider(self):
 
840
        data = {'network': {'name': 'net1',
 
841
                            mpnet.SEGMENTS:
 
842
                            [{pnet.NETWORK_TYPE: 'vlan',
 
843
                              pnet.PHYSICAL_NETWORK: 'physnet1',
 
844
                              pnet.SEGMENTATION_ID: 1}],
 
845
                            'tenant_id': 'tenant_one'}}
 
846
        net_req = self.new_create_request('networks', data)
 
847
        network = self.deserialize(self.fmt, net_req.get_response(self.api))
 
848
        self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
 
849
        self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
 
850
        self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
 
851
        self.assertNotIn(mpnet.SEGMENTS, network['network'])
 
852
 
 
853
        # Tests get_network()
 
854
        net_req = self.new_show_request('networks', network['network']['id'])
 
855
        network = self.deserialize(self.fmt, net_req.get_response(self.api))
 
856
        self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
 
857
        self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
 
858
        self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
 
859
        self.assertNotIn(mpnet.SEGMENTS, network['network'])
 
860
 
 
861
    def test_create_network_multiprovider(self):
 
862
        data = {'network': {'name': 'net1',
 
863
                            mpnet.SEGMENTS:
 
864
                            [{pnet.NETWORK_TYPE: 'vlan',
 
865
                              pnet.PHYSICAL_NETWORK: 'physnet1',
 
866
                              pnet.SEGMENTATION_ID: 1},
 
867
                             {pnet.NETWORK_TYPE: 'vlan',
 
868
                              pnet.PHYSICAL_NETWORK: 'physnet1',
 
869
                              pnet.SEGMENTATION_ID: 2}],
 
870
                            'tenant_id': 'tenant_one'}}
 
871
        network_req = self.new_create_request('networks', data)
 
872
        network = self.deserialize(self.fmt,
 
873
                                   network_req.get_response(self.api))
 
874
        segments = network['network'][mpnet.SEGMENTS]
 
875
        for segment_index, segment in enumerate(data['network']
 
876
                                                [mpnet.SEGMENTS]):
 
877
            for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
 
878
                          pnet.SEGMENTATION_ID]:
 
879
                self.assertEqual(segment.get(field),
 
880
                            segments[segment_index][field])
 
881
 
 
882
        # Tests get_network()
 
883
        net_req = self.new_show_request('networks', network['network']['id'])
 
884
        network = self.deserialize(self.fmt, net_req.get_response(self.api))
 
885
        segments = network['network'][mpnet.SEGMENTS]
 
886
        for segment_index, segment in enumerate(data['network']
 
887
                                                [mpnet.SEGMENTS]):
 
888
            for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
 
889
                          pnet.SEGMENTATION_ID]:
 
890
                self.assertEqual(segment.get(field),
 
891
                            segments[segment_index][field])
 
892
 
 
893
    def test_create_network_with_provider_and_multiprovider_fail(self):
 
894
        data = {'network': {'name': 'net1',
 
895
                            mpnet.SEGMENTS:
 
896
                            [{pnet.NETWORK_TYPE: 'vlan',
 
897
                              pnet.PHYSICAL_NETWORK: 'physnet1',
 
898
                              pnet.SEGMENTATION_ID: 1}],
 
899
                            pnet.NETWORK_TYPE: 'vlan',
 
900
                            pnet.PHYSICAL_NETWORK: 'physnet1',
 
901
                            pnet.SEGMENTATION_ID: 1,
 
902
                            'tenant_id': 'tenant_one'}}
 
903
 
 
904
        network_req = self.new_create_request('networks', data)
 
905
        res = network_req.get_response(self.api)
 
906
        self.assertEqual(400, res.status_int)
 
907
 
 
908
    def test_create_network_duplicate_full_segments(self):
 
909
        data = {'network': {'name': 'net1',
 
910
                            mpnet.SEGMENTS:
 
911
                            [{pnet.NETWORK_TYPE: 'vlan',
 
912
                              pnet.PHYSICAL_NETWORK: 'physnet1',
 
913
                              pnet.SEGMENTATION_ID: 1},
 
914
                             {pnet.NETWORK_TYPE: 'vlan',
 
915
                              pnet.PHYSICAL_NETWORK: 'physnet1',
 
916
                              pnet.SEGMENTATION_ID: 1}],
 
917
                            'tenant_id': 'tenant_one'}}
 
918
        network_req = self.new_create_request('networks', data)
 
919
        res = network_req.get_response(self.api)
 
920
        self.assertEqual(400, res.status_int)
 
921
 
 
922
    def test_create_network_duplicate_partial_segments(self):
 
923
        data = {'network': {'name': 'net1',
 
924
                            mpnet.SEGMENTS:
 
925
                            [{pnet.NETWORK_TYPE: 'vlan',
 
926
                              pnet.PHYSICAL_NETWORK: 'physnet1'},
 
927
                             {pnet.NETWORK_TYPE: 'vlan',
 
928
                              pnet.PHYSICAL_NETWORK: 'physnet1'}],
 
929
                            'tenant_id': 'tenant_one'}}
 
930
        network_req = self.new_create_request('networks', data)
 
931
        res = network_req.get_response(self.api)
 
932
        self.assertEqual(201, res.status_int)
 
933
 
 
934
    def test_release_network_segments(self):
 
935
        data = {'network': {'name': 'net1',
 
936
                            'admin_state_up': True,
 
937
                            'shared': False,
 
938
                            pnet.NETWORK_TYPE: 'vlan',
 
939
                            pnet.PHYSICAL_NETWORK: 'physnet1',
 
940
                            pnet.SEGMENTATION_ID: 1,
 
941
                            'tenant_id': 'tenant_one'}}
 
942
        network_req = self.new_create_request('networks', data)
 
943
        res = network_req.get_response(self.api)
 
944
        network = self.deserialize(self.fmt, res)
 
945
        network_id = network['network']['id']
 
946
        segment = {driver_api.NETWORK_TYPE: 'vlan',
 
947
                   driver_api.PHYSICAL_NETWORK: 'physnet2'}
 
948
        self.driver.type_manager.allocate_dynamic_segment(
 
949
            self.context.session, network_id, segment)
 
950
        dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
 
951
                                                     network_id,
 
952
                                                     'physnet2')
 
953
        self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
 
954
        self.assertEqual('physnet2',
 
955
                         dynamic_segment[driver_api.PHYSICAL_NETWORK])
 
956
        self.assertTrue(dynamic_segment[driver_api.SEGMENTATION_ID] > 0)
 
957
 
 
958
        with mock.patch.object(type_vlan.VlanTypeDriver,
 
959
                               'release_segment') as rs:
 
960
            req = self.new_delete_request('networks', network_id)
 
961
            res = req.get_response(self.api)
 
962
            self.assertEqual(2, rs.call_count)
 
963
        self.assertEqual(ml2_db.get_network_segments(
 
964
            self.context.session, network_id), [])
 
965
        self.assertIsNone(ml2_db.get_dynamic_segment(
 
966
            self.context.session, network_id, 'physnet2'))
 
967
 
 
968
    def test_release_segment_no_type_driver(self):
 
969
        data = {'network': {'name': 'net1',
 
970
                            'admin_state_up': True,
 
971
                            'shared': False,
 
972
                            pnet.NETWORK_TYPE: 'vlan',
 
973
                            pnet.PHYSICAL_NETWORK: 'physnet1',
 
974
                            pnet.SEGMENTATION_ID: 1,
 
975
                            'tenant_id': 'tenant_one'}}
 
976
        network_req = self.new_create_request('networks', data)
 
977
        res = network_req.get_response(self.api)
 
978
        network = self.deserialize(self.fmt, res)
 
979
        network_id = network['network']['id']
 
980
 
 
981
        segment = {driver_api.NETWORK_TYPE: 'faketype',
 
982
                   driver_api.PHYSICAL_NETWORK: 'physnet1',
 
983
                   driver_api.ID: 1}
 
984
        with mock.patch('neutron.plugins.ml2.managers.LOG') as log:
 
985
            with mock.patch('neutron.plugins.ml2.managers.db') as db:
 
986
                db.get_network_segments.return_value = (segment,)
 
987
                self.driver.type_manager.release_network_segments(
 
988
                    self.context.session, network_id)
 
989
 
 
990
                log.error.assert_called_once_with(
 
991
                    "Failed to release segment '%s' because "
 
992
                    "network type is not supported.", segment)
 
993
 
 
994
    def test_create_provider_fail(self):
 
995
        segment = {pnet.NETWORK_TYPE: None,
 
996
                   pnet.PHYSICAL_NETWORK: 'phys_net',
 
997
                   pnet.SEGMENTATION_ID: None}
 
998
        with testtools.ExpectedException(exc.InvalidInput):
 
999
            self.driver.type_manager._process_provider_create(segment)
 
1000
 
 
1001
    def test_create_network_plugin(self):
 
1002
        data = {'network': {'name': 'net1',
 
1003
                            'admin_state_up': True,
 
1004
                            'shared': False,
 
1005
                            pnet.NETWORK_TYPE: 'vlan',
 
1006
                            pnet.PHYSICAL_NETWORK: 'physnet1',
 
1007
                            pnet.SEGMENTATION_ID: 1,
 
1008
                            'tenant_id': 'tenant_one'}}
 
1009
 
 
1010
        def raise_mechanism_exc(*args, **kwargs):
 
1011
            raise ml2_exc.MechanismDriverError(
 
1012
                method='create_network_postcommit')
 
1013
 
 
1014
        with mock.patch('neutron.plugins.ml2.managers.MechanismManager.'
 
1015
                        'create_network_precommit', new=raise_mechanism_exc):
 
1016
            with testtools.ExpectedException(ml2_exc.MechanismDriverError):
 
1017
                self.driver.create_network(self.context, data)
 
1018
 
 
1019
    def test_extend_dictionary_no_segments(self):
 
1020
        network = dict(name='net_no_segment', id='5', tenant_id='tenant_one')
 
1021
        self.driver.type_manager.extend_network_dict_provider(self.context,
 
1022
                                                              network)
 
1023
        self.assertIsNone(network[pnet.NETWORK_TYPE])
 
1024
        self.assertIsNone(network[pnet.PHYSICAL_NETWORK])
 
1025
        self.assertIsNone(network[pnet.SEGMENTATION_ID])
 
1026
 
 
1027
 
 
1028
class TestMl2AllowedAddressPairs(Ml2PluginV2TestCase,
 
1029
                                 test_pair.TestAllowedAddressPairs):
 
1030
    def setUp(self, plugin=None):
 
1031
        super(test_pair.TestAllowedAddressPairs, self).setUp(
 
1032
            plugin=PLUGIN_NAME)
 
1033
 
 
1034
 
 
1035
class DHCPOptsTestCase(test_dhcpopts.TestExtraDhcpOpt):
 
1036
 
 
1037
    def setUp(self, plugin=None):
 
1038
        super(test_dhcpopts.ExtraDhcpOptDBTestCase, self).setUp(
 
1039
            plugin=PLUGIN_NAME)
 
1040
 
 
1041
 
 
1042
class Ml2PluginV2FaultyDriverTestCase(test_plugin.NeutronDbPluginV2TestCase):
 
1043
 
 
1044
    def setUp(self):
 
1045
        # Enable the test mechanism driver to ensure that
 
1046
        # we can successfully call through to all mechanism
 
1047
        # driver apis.
 
1048
        config.cfg.CONF.set_override('mechanism_drivers',
 
1049
                                     ['test', 'logger'],
 
1050
                                     group='ml2')
 
1051
        super(Ml2PluginV2FaultyDriverTestCase, self).setUp(PLUGIN_NAME)
 
1052
        self.port_create_status = 'DOWN'
 
1053
 
 
1054
 
 
1055
class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
 
1056
 
 
1057
    def test_create_network_faulty(self):
 
1058
 
 
1059
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1060
                               'create_network_postcommit',
 
1061
                               side_effect=ml2_exc.MechanismDriverError):
 
1062
            tenant_id = str(uuid.uuid4())
 
1063
            data = {'network': {'name': 'net1',
 
1064
                                'tenant_id': tenant_id}}
 
1065
            req = self.new_create_request('networks', data)
 
1066
            res = req.get_response(self.api)
 
1067
            self.assertEqual(500, res.status_int)
 
1068
            error = self.deserialize(self.fmt, res)
 
1069
            self.assertEqual('MechanismDriverError',
 
1070
                             error['NeutronError']['type'])
 
1071
            query_params = "tenant_id=%s" % tenant_id
 
1072
            nets = self._list('networks', query_params=query_params)
 
1073
            self.assertFalse(nets['networks'])
 
1074
 
 
1075
    def test_delete_network_faulty(self):
 
1076
 
 
1077
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1078
                               'delete_network_postcommit',
 
1079
                               side_effect=ml2_exc.MechanismDriverError):
 
1080
            with mock.patch.object(mech_logger.LoggerMechanismDriver,
 
1081
                                   'delete_network_postcommit') as dnp:
 
1082
 
 
1083
                data = {'network': {'name': 'net1',
 
1084
                                    'tenant_id': 'tenant_one'}}
 
1085
                network_req = self.new_create_request('networks', data)
 
1086
                network_res = network_req.get_response(self.api)
 
1087
                self.assertEqual(201, network_res.status_int)
 
1088
                network = self.deserialize(self.fmt, network_res)
 
1089
                net_id = network['network']['id']
 
1090
                req = self.new_delete_request('networks', net_id)
 
1091
                res = req.get_response(self.api)
 
1092
                self.assertEqual(204, res.status_int)
 
1093
                # Test if other mechanism driver was called
 
1094
                self.assertTrue(dnp.called)
 
1095
                self._show('networks', net_id,
 
1096
                           expected_code=webob.exc.HTTPNotFound.code)
 
1097
 
 
1098
    def test_update_network_faulty(self):
 
1099
 
 
1100
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1101
                               'update_network_postcommit',
 
1102
                               side_effect=ml2_exc.MechanismDriverError):
 
1103
            with mock.patch.object(mech_logger.LoggerMechanismDriver,
 
1104
                                   'update_network_postcommit') as unp:
 
1105
 
 
1106
                data = {'network': {'name': 'net1',
 
1107
                                    'tenant_id': 'tenant_one'}}
 
1108
                network_req = self.new_create_request('networks', data)
 
1109
                network_res = network_req.get_response(self.api)
 
1110
                self.assertEqual(201, network_res.status_int)
 
1111
                network = self.deserialize(self.fmt, network_res)
 
1112
                net_id = network['network']['id']
 
1113
 
 
1114
                new_name = 'a_brand_new_name'
 
1115
                data = {'network': {'name': new_name}}
 
1116
                req = self.new_update_request('networks', data, net_id)
 
1117
                res = req.get_response(self.api)
 
1118
                self.assertEqual(500, res.status_int)
 
1119
                error = self.deserialize(self.fmt, res)
 
1120
                self.assertEqual('MechanismDriverError',
 
1121
                                 error['NeutronError']['type'])
 
1122
                # Test if other mechanism driver was called
 
1123
                self.assertTrue(unp.called)
 
1124
                net = self._show('networks', net_id)
 
1125
                self.assertEqual(new_name, net['network']['name'])
 
1126
 
 
1127
                self._delete('networks', net_id)
 
1128
 
 
1129
    def test_create_subnet_faulty(self):
 
1130
 
 
1131
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1132
                               'create_subnet_postcommit',
 
1133
                               side_effect=ml2_exc.MechanismDriverError):
 
1134
 
 
1135
            with self.network() as network:
 
1136
                net_id = network['network']['id']
 
1137
                data = {'subnet': {'network_id': net_id,
 
1138
                                   'cidr': '10.0.20.0/24',
 
1139
                                   'ip_version': '4',
 
1140
                                   'name': 'subnet1',
 
1141
                                   'tenant_id':
 
1142
                                   network['network']['tenant_id'],
 
1143
                                   'gateway_ip': '10.0.20.1'}}
 
1144
                req = self.new_create_request('subnets', data)
 
1145
                res = req.get_response(self.api)
 
1146
                self.assertEqual(500, res.status_int)
 
1147
                error = self.deserialize(self.fmt, res)
 
1148
                self.assertEqual('MechanismDriverError',
 
1149
                                 error['NeutronError']['type'])
 
1150
                query_params = "network_id=%s" % net_id
 
1151
                subnets = self._list('subnets', query_params=query_params)
 
1152
                self.assertFalse(subnets['subnets'])
 
1153
 
 
1154
    def test_delete_subnet_faulty(self):
 
1155
 
 
1156
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1157
                               'delete_subnet_postcommit',
 
1158
                               side_effect=ml2_exc.MechanismDriverError):
 
1159
            with mock.patch.object(mech_logger.LoggerMechanismDriver,
 
1160
                                   'delete_subnet_postcommit') as dsp:
 
1161
 
 
1162
                with self.network() as network:
 
1163
                    data = {'subnet': {'network_id':
 
1164
                                       network['network']['id'],
 
1165
                                       'cidr': '10.0.20.0/24',
 
1166
                                       'ip_version': '4',
 
1167
                                       'name': 'subnet1',
 
1168
                                       'tenant_id':
 
1169
                                       network['network']['tenant_id'],
 
1170
                                       'gateway_ip': '10.0.20.1'}}
 
1171
                    subnet_req = self.new_create_request('subnets', data)
 
1172
                    subnet_res = subnet_req.get_response(self.api)
 
1173
                    self.assertEqual(201, subnet_res.status_int)
 
1174
                    subnet = self.deserialize(self.fmt, subnet_res)
 
1175
                    subnet_id = subnet['subnet']['id']
 
1176
 
 
1177
                    req = self.new_delete_request('subnets', subnet_id)
 
1178
                    res = req.get_response(self.api)
 
1179
                    self.assertEqual(204, res.status_int)
 
1180
                    # Test if other mechanism driver was called
 
1181
                    self.assertTrue(dsp.called)
 
1182
                    self._show('subnets', subnet_id,
 
1183
                               expected_code=webob.exc.HTTPNotFound.code)
 
1184
 
 
1185
    def test_update_subnet_faulty(self):
 
1186
 
 
1187
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1188
                               'update_subnet_postcommit',
 
1189
                               side_effect=ml2_exc.MechanismDriverError):
 
1190
            with mock.patch.object(mech_logger.LoggerMechanismDriver,
 
1191
                                   'update_subnet_postcommit') as usp:
 
1192
 
 
1193
                with self.network() as network:
 
1194
                    data = {'subnet': {'network_id':
 
1195
                                       network['network']['id'],
 
1196
                                       'cidr': '10.0.20.0/24',
 
1197
                                       'ip_version': '4',
 
1198
                                       'name': 'subnet1',
 
1199
                                       'tenant_id':
 
1200
                                       network['network']['tenant_id'],
 
1201
                                       'gateway_ip': '10.0.20.1'}}
 
1202
                    subnet_req = self.new_create_request('subnets', data)
 
1203
                    subnet_res = subnet_req.get_response(self.api)
 
1204
                    self.assertEqual(201, subnet_res.status_int)
 
1205
                    subnet = self.deserialize(self.fmt, subnet_res)
 
1206
                    subnet_id = subnet['subnet']['id']
 
1207
                    new_name = 'a_brand_new_name'
 
1208
                    data = {'subnet': {'name': new_name}}
 
1209
                    req = self.new_update_request('subnets', data, subnet_id)
 
1210
                    res = req.get_response(self.api)
 
1211
                    self.assertEqual(500, res.status_int)
 
1212
                    error = self.deserialize(self.fmt, res)
 
1213
                    self.assertEqual('MechanismDriverError',
 
1214
                                     error['NeutronError']['type'])
 
1215
                    # Test if other mechanism driver was called
 
1216
                    self.assertTrue(usp.called)
 
1217
                    subnet = self._show('subnets', subnet_id)
 
1218
                    self.assertEqual(new_name, subnet['subnet']['name'])
 
1219
 
 
1220
                    self._delete('subnets', subnet['subnet']['id'])
 
1221
 
 
1222
    def test_create_port_faulty(self):
 
1223
 
 
1224
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1225
                               'create_port_postcommit',
 
1226
                               side_effect=ml2_exc.MechanismDriverError):
 
1227
 
 
1228
            with self.network() as network:
 
1229
                net_id = network['network']['id']
 
1230
                data = {'port': {'network_id': net_id,
 
1231
                                 'tenant_id':
 
1232
                                 network['network']['tenant_id'],
 
1233
                                 'name': 'port1',
 
1234
                                 'admin_state_up': 1,
 
1235
                                 'fixed_ips': []}}
 
1236
                req = self.new_create_request('ports', data)
 
1237
                res = req.get_response(self.api)
 
1238
                self.assertEqual(500, res.status_int)
 
1239
                error = self.deserialize(self.fmt, res)
 
1240
                self.assertEqual('MechanismDriverError',
 
1241
                                 error['NeutronError']['type'])
 
1242
                query_params = "network_id=%s" % net_id
 
1243
                ports = self._list('ports', query_params=query_params)
 
1244
                self.assertFalse(ports['ports'])
 
1245
 
 
1246
    def test_update_port_faulty(self):
 
1247
 
 
1248
        with mock.patch.object(mech_test.TestMechanismDriver,
 
1249
                               'update_port_postcommit',
 
1250
                               side_effect=ml2_exc.MechanismDriverError):
 
1251
            with mock.patch.object(mech_logger.LoggerMechanismDriver,
 
1252
                                   'update_port_postcommit') as upp:
 
1253
 
 
1254
                with self.network() as network:
 
1255
                    data = {'port': {'network_id': network['network']['id'],
 
1256
                                     'tenant_id':
 
1257
                                     network['network']['tenant_id'],
 
1258
                                     'name': 'port1',
 
1259
                                     'admin_state_up': 1,
 
1260
                                     'fixed_ips': []}}
 
1261
                    port_req = self.new_create_request('ports', data)
 
1262
                    port_res = port_req.get_response(self.api)
 
1263
                    self.assertEqual(201, port_res.status_int)
 
1264
                    port = self.deserialize(self.fmt, port_res)
 
1265
                    port_id = port['port']['id']
 
1266
 
 
1267
                    new_name = 'a_brand_new_name'
 
1268
                    data = {'port': {'name': new_name}}
 
1269
                    req = self.new_update_request('ports', data, port_id)
 
1270
                    res = req.get_response(self.api)
 
1271
                    self.assertEqual(500, res.status_int)
 
1272
                    error = self.deserialize(self.fmt, res)
 
1273
                    self.assertEqual('MechanismDriverError',
 
1274
                                     error['NeutronError']['type'])
 
1275
                    # Test if other mechanism driver was called
 
1276
                    self.assertTrue(upp.called)
 
1277
                    port = self._show('ports', port_id)
 
1278
                    self.assertEqual(new_name, port['port']['name'])
 
1279
 
 
1280
                    self._delete('ports', port['port']['id'])
 
1281
 
 
1282
 
 
1283
class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
 
1284
    def setUp(self):
 
1285
        super(TestMl2PluginCreateUpdateDeletePort, self).setUp()
 
1286
        self.context = mock.MagicMock()
 
1287
        self.notify_p = mock.patch('neutron.callbacks.registry.notify')
 
1288
        self.notify = self.notify_p.start()
 
1289
 
 
1290
    def _ensure_transaction_is_closed(self):
 
1291
        transaction = self.context.session.begin(subtransactions=True)
 
1292
        enter = transaction.__enter__.call_count
 
1293
        exit = transaction.__exit__.call_count
 
1294
        self.assertEqual(enter, exit)
 
1295
 
 
1296
    def _create_plugin_for_create_update_port(self, new_host_port):
 
1297
        plugin = ml2_plugin.Ml2Plugin()
 
1298
        plugin.extension_manager = mock.Mock()
 
1299
        plugin.type_manager = mock.Mock()
 
1300
        plugin.mechanism_manager = mock.Mock()
 
1301
        plugin.notifier = mock.Mock()
 
1302
        plugin._get_host_port_if_changed = mock.Mock(
 
1303
            return_value=new_host_port)
 
1304
        plugin._check_mac_update_allowed = mock.Mock(return_value=True)
 
1305
 
 
1306
        self.notify.side_effect = (
 
1307
            lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
 
1308
 
 
1309
        return plugin
 
1310
 
 
1311
    def test_create_port_rpc_outside_transaction(self):
 
1312
        with contextlib.nested(
 
1313
            mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
 
1314
            mock.patch.object(base_plugin.NeutronDbPluginV2, 'create_port'),
 
1315
        ) as (init, super_create_port):
 
1316
            init.return_value = None
 
1317
 
 
1318
            new_host_port = mock.Mock()
 
1319
            plugin = self._create_plugin_for_create_update_port(new_host_port)
 
1320
 
 
1321
            plugin.create_port(self.context, mock.MagicMock())
 
1322
 
 
1323
            kwargs = {'context': self.context, 'port': new_host_port}
 
1324
            self.notify.assert_called_once_with('port', 'after_create',
 
1325
                plugin, **kwargs)
 
1326
 
 
1327
    def test_update_port_rpc_outside_transaction(self):
 
1328
        with contextlib.nested(
 
1329
            mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
 
1330
            mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
 
1331
        ) as (init, super_update_port):
 
1332
            init.return_value = None
 
1333
            new_host_port = mock.Mock()
 
1334
            plugin = self._create_plugin_for_create_update_port(new_host_port)
 
1335
 
 
1336
            plugin.update_port(self.context, 'fake_id', mock.MagicMock())
 
1337
 
 
1338
            kwargs = {
 
1339
                'context': self.context,
 
1340
                'port': new_host_port,
 
1341
                'mac_address_updated': True,
 
1342
            }
 
1343
            self.notify.assert_called_once_with('port', 'after_update',
 
1344
                plugin, **kwargs)
 
1345
 
 
1346
    def test_notify_outside_of_delete_transaction(self):
 
1347
        self.notify.side_effect = (
 
1348
            lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
 
1349
        l3plugin = mock.Mock()
 
1350
        l3plugin.supported_extension_aliases = [
 
1351
            'router', constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
 
1352
            constants.L3_DISTRIBUTED_EXT_ALIAS
 
1353
        ]
 
1354
        with contextlib.nested(
 
1355
            mock.patch.object(ml2_plugin.Ml2Plugin, '__init__',
 
1356
                              return_value=None),
 
1357
            mock.patch.object(manager.NeutronManager,
 
1358
                              'get_service_plugins',
 
1359
                              return_value={'L3_ROUTER_NAT': l3plugin}),
 
1360
        ):
 
1361
            plugin = self._create_plugin_for_create_update_port(mock.Mock())
 
1362
            # deleting the port will call registry.notify, which will
 
1363
            # run the transaction balancing function defined in this test
 
1364
            plugin.delete_port(self.context, 'fake_id')
 
1365
            self.assertTrue(self.notify.call_count)