1
# Copyright (c) 2013 OpenStack Foundation
4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
5
# not use this file except in compliance with the License. You may obtain
6
# a copy of the License at
8
# http://www.apache.org/licenses/LICENSE-2.0
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13
# License for the specific language governing permissions and limitations
25
from oslo_db import exception as db_exc
27
from neutron.callbacks import registry
28
from neutron.common import constants
29
from neutron.common import exceptions as exc
30
from neutron.common import utils
31
from neutron import context
32
from neutron.db import api as db_api
33
from neutron.db import db_base_plugin_v2 as base_plugin
34
from neutron.db import l3_db
35
from neutron.extensions import external_net as external_net
36
from neutron.extensions import multiprovidernet as mpnet
37
from neutron.extensions import portbindings
38
from neutron.extensions import providernet as pnet
39
from neutron import manager
40
from neutron.plugins.common import constants as service_constants
41
from neutron.plugins.ml2.common import exceptions as ml2_exc
42
from neutron.plugins.ml2 import config
43
from neutron.plugins.ml2 import db as ml2_db
44
from neutron.plugins.ml2 import driver_api
45
from neutron.plugins.ml2 import driver_context
46
from neutron.plugins.ml2.drivers import type_vlan
47
from neutron.plugins.ml2 import models
48
from neutron.plugins.ml2 import plugin as ml2_plugin
49
from neutron.tests import base
50
from neutron.tests.unit import _test_extension_portbindings as test_bindings
51
from neutron.tests.unit.agent import test_securitygroups_rpc as test_sg_rpc
52
from neutron.tests.unit.db import test_allowedaddresspairs_db as test_pair
53
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin
54
from neutron.tests.unit.extensions import test_extra_dhcp_opt as test_dhcpopts
55
from neutron.tests.unit.plugins.ml2.drivers import mechanism_logger as \
57
from neutron.tests.unit.plugins.ml2.drivers import mechanism_test as mech_test
60
config.cfg.CONF.import_opt('network_vlan_ranges',
61
'neutron.plugins.ml2.drivers.type_vlan',
62
group='ml2_type_vlan')
65
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
67
DEVICE_OWNER_COMPUTE = 'compute:None'
71
# TODO(marun) - Move to somewhere common for reuse
72
class PluginConfFixture(fixtures.Fixture):
73
"""Plugin configuration shared across the unit and functional tests."""
75
def __init__(self, plugin_name, parent_setup=None):
76
self.plugin_name = plugin_name
77
self.parent_setup = parent_setup
80
super(PluginConfFixture, self).setUp()
85
class Ml2ConfFixture(PluginConfFixture):
87
def __init__(self, parent_setup=None):
88
super(Ml2ConfFixture, self).__init__(PLUGIN_NAME, parent_setup)
91
class Ml2PluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
93
_mechanism_drivers = ['logger', 'test']
95
def setup_parent(self):
96
"""Perform parent setup with the common plugin configuration class."""
97
l3_plugin = ('neutron.tests.unit.extensions.test_l3.'
98
'TestL3NatServicePlugin')
99
service_plugins = {'l3_plugin_name': l3_plugin}
100
# Ensure that the parent setup can be called without arguments
101
# by the common configuration setUp.
102
parent_setup = functools.partial(
103
super(Ml2PluginV2TestCase, self).setUp,
105
service_plugins=service_plugins,
107
self.useFixture(Ml2ConfFixture(parent_setup))
108
self.port_create_status = 'DOWN'
111
# Enable the test mechanism driver to ensure that
112
# we can successfully call through to all mechanism
114
config.cfg.CONF.set_override('mechanism_drivers',
115
self._mechanism_drivers,
117
self.physnet = 'physnet1'
118
self.vlan_range = '1:100'
119
self.vlan_range2 = '200:300'
120
self.physnet2 = 'physnet2'
121
self.phys_vrange = ':'.join([self.physnet, self.vlan_range])
122
self.phys2_vrange = ':'.join([self.physnet2, self.vlan_range2])
123
config.cfg.CONF.set_override('network_vlan_ranges',
124
[self.phys_vrange, self.phys2_vrange],
125
group='ml2_type_vlan')
127
self.driver = ml2_plugin.Ml2Plugin()
128
self.context = context.get_admin_context()
131
class TestMl2BulkToggleWithoutBulkless(Ml2PluginV2TestCase):
133
_mechanism_drivers = ['logger', 'test']
135
def test_bulk_enabled_with_bulk_drivers(self):
136
self.assertFalse(self._skip_native_bulk)
139
class TestMl2BasicGet(test_plugin.TestBasicGet,
140
Ml2PluginV2TestCase):
144
class TestMl2V2HTTPResponse(test_plugin.TestV2HTTPResponse,
145
Ml2PluginV2TestCase):
149
class TestMl2NetworksV2(test_plugin.TestNetworksV2,
150
Ml2PluginV2TestCase):
151
def setUp(self, plugin=None):
152
super(TestMl2NetworksV2, self).setUp()
154
self.pnets = [{'name': 'net1',
155
pnet.NETWORK_TYPE: 'vlan',
156
pnet.PHYSICAL_NETWORK: 'physnet1',
157
pnet.SEGMENTATION_ID: 1,
158
'tenant_id': 'tenant_one'},
160
pnet.NETWORK_TYPE: 'vlan',
161
pnet.PHYSICAL_NETWORK: 'physnet2',
162
pnet.SEGMENTATION_ID: 210,
163
'tenant_id': 'tenant_one'},
165
pnet.NETWORK_TYPE: 'vlan',
166
pnet.PHYSICAL_NETWORK: 'physnet2',
167
pnet.SEGMENTATION_ID: 220,
168
'tenant_id': 'tenant_one'}
170
# multiprovider networks
171
self.mp_nets = [{'name': 'net4',
173
[{pnet.NETWORK_TYPE: 'vlan',
174
pnet.PHYSICAL_NETWORK: 'physnet2',
175
pnet.SEGMENTATION_ID: 1},
176
{pnet.NETWORK_TYPE: 'vlan',
177
pnet.PHYSICAL_NETWORK: 'physnet2',
178
pnet.SEGMENTATION_ID: 202}],
179
'tenant_id': 'tenant_one'}
181
self.nets = self.mp_nets + self.pnets
183
def test_port_delete_helper_tolerates_failure(self):
184
plugin = manager.NeutronManager.get_plugin()
185
with mock.patch.object(plugin, "delete_port",
186
side_effect=exc.PortNotFound(port_id="123")):
187
plugin._delete_ports(None, [mock.MagicMock()])
189
def test_subnet_delete_helper_tolerates_failure(self):
190
plugin = manager.NeutronManager.get_plugin()
191
with mock.patch.object(plugin, "delete_subnet",
192
side_effect=exc.SubnetNotFound(subnet_id="1")):
193
plugin._delete_subnets(None, [mock.MagicMock()])
195
def _create_and_verify_networks(self, networks):
196
for net_idx, net in enumerate(networks):
198
req = self.new_create_request('networks',
201
network = self.deserialize(self.fmt,
202
req.get_response(self.api))['network']
203
if mpnet.SEGMENTS not in net:
204
for k, v in six.iteritems(net):
205
self.assertEqual(net[k], network[k])
206
self.assertNotIn(mpnet.SEGMENTS, network)
208
segments = network[mpnet.SEGMENTS]
209
expected_segments = net[mpnet.SEGMENTS]
210
self.assertEqual(len(expected_segments), len(segments))
211
for expected, actual in zip(expected_segments, segments):
212
self.assertEqual(expected, actual)
214
def _lookup_network_by_segmentation_id(self, seg_id, num_expected_nets):
215
params_str = "%s=%s" % (pnet.SEGMENTATION_ID, seg_id)
216
net_req = self.new_list_request('networks', None,
218
networks = self.deserialize(self.fmt, net_req.get_response(self.api))
219
if num_expected_nets:
220
self.assertIsNotNone(networks)
221
self.assertEqual(num_expected_nets, len(networks['networks']))
223
self.assertIsNone(networks)
226
def test_list_networks_with_segmentation_id(self):
227
self._create_and_verify_networks(self.pnets)
228
# verify we can find the network that we expect
230
expected_net = [n for n in self.pnets
231
if n[pnet.SEGMENTATION_ID] == lookup_vlan_id].pop()
232
networks = self._lookup_network_by_segmentation_id(lookup_vlan_id, 1)
233
# verify all provider attributes
234
network = networks['networks'][0]
235
for attr in pnet.ATTRIBUTES:
236
self.assertEqual(expected_net[attr], network[attr])
238
def test_list_mpnetworks_with_segmentation_id(self):
239
self._create_and_verify_networks(self.nets)
241
# get all networks with seg_id=1 (including multisegment networks)
243
networks = self._lookup_network_by_segmentation_id(lookup_vlan_id, 2)
246
networks = [n for n in networks['networks'] if mpnet.SEGMENTS in n]
247
network = networks.pop()
248
# verify attributes of the looked up item
249
segments = network[mpnet.SEGMENTS]
250
expected_segments = self.mp_nets[0][mpnet.SEGMENTS]
251
self.assertEqual(len(expected_segments), len(segments))
252
for expected, actual in zip(expected_segments, segments):
253
self.assertEqual(expected, actual)
255
def test_create_network_segment_allocation_fails(self):
256
plugin = manager.NeutronManager.get_plugin()
257
with mock.patch.object(plugin.type_manager, 'create_network_segments',
258
side_effect=db_exc.RetryRequest(ValueError())) as f:
259
self.assertRaises(ValueError,
260
plugin.create_network,
261
context.get_admin_context(),
262
{'network': {'tenant_id': 'sometenant',
264
'admin_state_up': True,
266
self.assertEqual(db_api.MAX_RETRIES + 1, f.call_count)
269
class TestMl2SubnetsV2(test_plugin.TestSubnetsV2,
270
Ml2PluginV2TestCase):
274
class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
276
def test_update_port_status_build(self):
277
with self.port() as port:
278
self.assertEqual('DOWN', port['port']['status'])
279
self.assertEqual('DOWN', self.port_create_status)
281
def test_update_port_status_short_id(self):
282
ctx = context.get_admin_context()
283
plugin = manager.NeutronManager.get_plugin()
284
with self.port() as port:
285
with mock.patch.object(ml2_db, 'get_binding_levels',
286
return_value=[]) as mock_gbl:
287
port_id = port['port']['id']
288
short_id = port_id[:11]
289
plugin.update_port_status(ctx, short_id, 'UP')
290
mock_gbl.assert_called_once_with(mock.ANY, port_id, mock.ANY)
292
def test_update_port_mac(self):
293
self.check_update_port_mac(
294
host_arg={portbindings.HOST_ID: HOST},
295
arg_list=(portbindings.HOST_ID,))
297
def test_update_non_existent_port(self):
298
ctx = context.get_admin_context()
299
plugin = manager.NeutronManager.get_plugin()
300
data = {'port': {'admin_state_up': False}}
301
self.assertRaises(exc.PortNotFound, plugin.update_port, ctx,
302
'invalid-uuid', data)
304
def test_delete_non_existent_port(self):
305
ctx = context.get_admin_context()
306
plugin = manager.NeutronManager.get_plugin()
307
with mock.patch.object(ml2_plugin.LOG, 'debug') as log_debug:
308
plugin.delete_port(ctx, 'invalid-uuid', l3_port_check=False)
309
log_debug.assert_has_calls([
310
mock.call(_("Deleting port %s"), 'invalid-uuid'),
311
mock.call(_("The port '%s' was deleted"), 'invalid-uuid')
314
def test_l3_cleanup_on_net_delete(self):
315
l3plugin = manager.NeutronManager.get_service_plugins().get(
316
service_constants.L3_ROUTER_NAT)
317
kwargs = {'arg_list': (external_net.EXTERNAL,),
318
external_net.EXTERNAL: True}
319
with self.network(**kwargs) as n:
320
with self.subnet(network=n, cidr='200.0.0.0/22'):
321
l3plugin.create_floatingip(
322
context.get_admin_context(),
323
{'floatingip': {'floating_network_id': n['network']['id'],
324
'tenant_id': n['network']['tenant_id']}}
326
self._delete('networks', n['network']['id'])
327
flips = l3plugin.get_floatingips(context.get_admin_context())
328
self.assertFalse(flips)
330
def test_create_ports_bulk_port_binding_failure(self):
331
ctx = context.get_admin_context()
332
with self.network() as net:
333
plugin = manager.NeutronManager.get_plugin()
335
with mock.patch.object(plugin, '_bind_port_if_needed',
336
side_effect=ml2_exc.MechanismDriverError(
337
method='create_port_bulk')) as _bind_port_if_needed:
339
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
340
'test', True, context=ctx)
342
self.assertTrue(_bind_port_if_needed.called)
343
# We expect a 500 as we injected a fault in the plugin
344
self._validate_behavior_on_bulk_failure(
345
res, 'ports', webob.exc.HTTPServerError.code)
347
def test_create_ports_bulk_with_sec_grp(self):
348
ctx = context.get_admin_context()
349
plugin = manager.NeutronManager.get_plugin()
350
with contextlib.nested(
352
mock.patch.object(plugin.notifier,
353
'security_groups_member_updated'),
354
mock.patch.object(plugin.notifier,
355
'security_groups_provider_updated')
356
) as (net, m_upd, p_upd):
358
res = self._create_port_bulk(self.fmt, 3, net['network']['id'],
359
'test', True, context=ctx)
360
ports = self.deserialize(self.fmt, res)
361
used_sg = ports['ports'][0]['security_groups']
362
m_upd.assert_called_once_with(ctx, used_sg)
363
self.assertFalse(p_upd.called)
365
def test_create_ports_bulk_with_sec_grp_member_provider_update(self):
366
ctx = context.get_admin_context()
367
plugin = manager.NeutronManager.get_plugin()
368
with contextlib.nested(
370
mock.patch.object(plugin.notifier,
371
'security_groups_member_updated'),
372
mock.patch.object(plugin.notifier,
373
'security_groups_provider_updated')
374
) as (net, m_upd, p_upd):
376
net_id = net['network']['id']
378
'network_id': net_id,
379
'tenant_id': self._tenant_id
382
'network_id': net_id,
383
'tenant_id': self._tenant_id,
384
'device_owner': constants.DEVICE_OWNER_DHCP
388
res = self._create_bulk_from_list(self.fmt, 'port',
390
ports = self.deserialize(self.fmt, res)
391
used_sg = ports['ports'][0]['security_groups']
392
m_upd.assert_called_once_with(ctx, used_sg)
393
p_upd.assert_called_once_with(ctx)
397
data[0]['device_owner'] = constants.DEVICE_OWNER_DHCP
398
self._create_bulk_from_list(self.fmt, 'port',
400
self.assertFalse(m_upd.called)
401
p_upd.assert_called_once_with(ctx)
403
def test_create_ports_bulk_with_sec_grp_provider_update_ipv6(self):
404
ctx = context.get_admin_context()
405
plugin = manager.NeutronManager.get_plugin()
406
fake_prefix = '2001:db8::/64'
407
fake_gateway = 'fe80::1'
408
with self.network() as net:
409
with contextlib.nested(
410
self.subnet(net, gateway_ip=fake_gateway,
411
cidr=fake_prefix, ip_version=6),
413
plugin.notifier, 'security_groups_member_updated'),
415
plugin.notifier, 'security_groups_provider_updated')
416
) as (snet_v6, m_upd, p_upd):
418
net_id = net['network']['id']
420
'network_id': net_id,
421
'tenant_id': self._tenant_id,
422
'fixed_ips': [{'subnet_id': snet_v6['subnet']['id']}],
423
'device_owner': constants.DEVICE_OWNER_ROUTER_INTF
426
self._create_bulk_from_list(self.fmt, 'port',
428
self.assertFalse(m_upd.called)
429
p_upd.assert_called_once_with(ctx)
431
def test_delete_port_no_notify_in_disassociate_floatingips(self):
432
ctx = context.get_admin_context()
433
plugin = manager.NeutronManager.get_plugin()
434
l3plugin = manager.NeutronManager.get_service_plugins().get(
435
service_constants.L3_ROUTER_NAT)
436
with contextlib.nested(
438
mock.patch.object(l3plugin, 'disassociate_floatingips'),
439
mock.patch.object(registry, 'notify')
440
) as (port, disassociate_floatingips, notify):
442
port_id = port['port']['id']
443
plugin.delete_port(ctx, port_id)
445
# check that no notification was requested while under
447
disassociate_floatingips.assert_has_calls([
448
mock.call(ctx, port_id, do_notify=False)
451
# check that notifier was still triggered
452
self.assertTrue(notify.call_counts)
454
def test_check_if_compute_port_serviced_by_dvr(self):
455
self.assertTrue(utils.is_dvr_serviced('compute:None'))
457
def test_check_if_lbaas_vip_port_serviced_by_dvr(self):
458
self.assertTrue(utils.is_dvr_serviced(
459
constants.DEVICE_OWNER_LOADBALANCER))
461
def test_check_if_dhcp_port_serviced_by_dvr(self):
462
self.assertTrue(utils.is_dvr_serviced(constants.DEVICE_OWNER_DHCP))
464
def test_check_if_port_not_serviced_by_dvr(self):
465
self.assertFalse(utils.is_dvr_serviced(
466
constants.DEVICE_OWNER_ROUTER_INTF))
468
def test_disassociate_floatingips_do_notify_returns_nothing(self):
469
ctx = context.get_admin_context()
470
l3plugin = manager.NeutronManager.get_service_plugins().get(
471
service_constants.L3_ROUTER_NAT)
472
with self.port() as port:
474
port_id = port['port']['id']
475
# check that nothing is returned when notifications are handled
476
# by the called method
477
self.assertIsNone(l3plugin.disassociate_floatingips(ctx, port_id))
480
class TestMl2PluginOnly(Ml2PluginV2TestCase):
481
"""For testing methods that don't call drivers"""
483
def _test_check_mac_update_allowed(self, vif_type, expect_change=True):
484
plugin = manager.NeutronManager.get_plugin()
485
port = {'mac_address': "fake_mac", 'id': "fake_id"}
487
new_attrs = {"mac_address": "dummy_mac"}
489
new_attrs = {"mac_address": port['mac_address']}
490
binding = mock.Mock()
491
binding.vif_type = vif_type
492
mac_changed = plugin._check_mac_update_allowed(port, new_attrs,
494
self.assertEqual(expect_change, mac_changed)
496
def test_check_mac_update_allowed_if_no_mac_change(self):
497
self._test_check_mac_update_allowed(portbindings.VIF_TYPE_UNBOUND,
500
def test_check_mac_update_allowed_unless_bound(self):
501
with testtools.ExpectedException(exc.PortBound):
502
self._test_check_mac_update_allowed(portbindings.VIF_TYPE_OVS)
505
class TestMl2DvrPortsV2(TestMl2PortsV2):
507
super(TestMl2DvrPortsV2, self).setUp()
508
extensions = ['router',
509
constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
510
constants.L3_DISTRIBUTED_EXT_ALIAS]
511
self.plugin = manager.NeutronManager.get_plugin()
512
self.l3plugin = mock.Mock()
513
type(self.l3plugin).supported_extension_aliases = (
514
mock.PropertyMock(return_value=extensions))
515
self.service_plugins = {'L3_ROUTER_NAT': self.l3plugin}
517
def _test_delete_dvr_serviced_port(self, device_owner, floating_ip=False):
518
ns_to_delete = {'host': 'myhost', 'agent_id': 'vm_l3_agent',
519
'router_id': 'my_router'}
522
fip_set.add(ns_to_delete['router_id'])
524
with contextlib.nested(
525
mock.patch.object(manager.NeutronManager,
526
'get_service_plugins',
527
return_value=self.service_plugins),
528
self.port(device_owner=device_owner),
529
mock.patch.object(registry, 'notify'),
530
mock.patch.object(self.l3plugin, 'disassociate_floatingips',
531
return_value=fip_set),
532
mock.patch.object(self.l3plugin, 'dvr_deletens_if_no_port',
533
return_value=[ns_to_delete]),
534
) as (get_service_plugin, port, notify, disassociate_floatingips,
535
dvr_delns_ifno_port):
537
port_id = port['port']['id']
538
self.plugin.delete_port(self.context, port_id)
540
self.assertTrue(notify.call_count)
541
dvr_delns_ifno_port.assert_called_once_with(self.context,
544
def test_delete_last_vm_port(self):
545
self._test_delete_dvr_serviced_port(device_owner='compute:None')
547
def test_delete_last_vm_port_with_floatingip(self):
548
self._test_delete_dvr_serviced_port(device_owner='compute:None',
551
def test_delete_lbaas_vip_port(self):
552
self._test_delete_dvr_serviced_port(
553
device_owner=constants.DEVICE_OWNER_LOADBALANCER)
555
def test_concurrent_csnat_port_delete(self):
556
plugin = manager.NeutronManager.get_service_plugins()[
557
service_constants.L3_ROUTER_NAT]
558
r = plugin.create_router(
560
{'router': {'name': 'router', 'admin_state_up': True}})
561
with self.subnet() as s:
562
p = plugin.add_router_interface(self.context, r['id'],
563
{'subnet_id': s['subnet']['id']})
565
# lie to turn the port into an SNAT interface
566
with self.context.session.begin():
567
rp = self.context.session.query(l3_db.RouterPort).filter_by(
568
port_id=p['port_id']).first()
569
rp.port_type = constants.DEVICE_OWNER_ROUTER_SNAT
571
# take the port away before csnat gets a chance to delete it
572
# to simulate a concurrent delete
573
orig_get_ports = plugin._core_plugin.get_ports
575
def get_ports_with_delete_first(*args, **kwargs):
576
plugin._core_plugin.delete_port(self.context,
579
return orig_get_ports(*args, **kwargs)
580
plugin._core_plugin.get_ports = get_ports_with_delete_first
582
# This should be able to handle a concurrent delete without raising
584
router = plugin._get_router(self.context, r['id'])
585
plugin.delete_csnat_router_interface_ports(self.context, router)
588
class TestMl2PortBinding(Ml2PluginV2TestCase,
589
test_bindings.PortBindingsTestCase):
590
# Test case does not set binding:host_id, so ml2 does not attempt
592
VIF_TYPE = portbindings.VIF_TYPE_UNBOUND
593
HAS_PORT_FILTER = False
595
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_HYBRID_DRIVER
597
def setUp(self, firewall_driver=None):
598
test_sg_rpc.set_firewall_driver(self.FIREWALL_DRIVER)
599
config.cfg.CONF.set_override(
600
'enable_security_group', self.ENABLE_SG,
601
group='SECURITYGROUP')
602
super(TestMl2PortBinding, self).setUp()
604
def _check_port_binding_profile(self, port, profile=None):
605
self.assertIn('id', port)
606
self.assertIn(portbindings.PROFILE, port)
607
value = port[portbindings.PROFILE]
608
self.assertEqual(profile or {}, value)
610
def test_create_port_binding_profile(self):
611
self._test_create_port_binding_profile({'a': 1, 'b': 2})
613
def test_update_port_binding_profile(self):
614
self._test_update_port_binding_profile({'c': 3})
616
def test_create_port_binding_profile_too_big(self):
618
profile_arg = {portbindings.PROFILE: {'d': s}}
620
with self.port(expected_res_status=400,
621
arg_list=(portbindings.PROFILE,),
624
except webob.exc.HTTPClientError:
627
def test_remove_port_binding_profile(self):
629
profile_arg = {portbindings.PROFILE: profile}
630
with self.port(arg_list=(portbindings.PROFILE,),
631
**profile_arg) as port:
632
self._check_port_binding_profile(port['port'], profile)
633
port_id = port['port']['id']
634
profile_arg = {portbindings.PROFILE: None}
635
port = self._update('ports', port_id,
636
{'port': profile_arg})['port']
637
self._check_port_binding_profile(port)
638
port = self._show('ports', port_id)['port']
639
self._check_port_binding_profile(port)
641
def test_return_on_concurrent_delete_and_binding(self):
642
# create a port and delete it so we have an expired mechanism context
643
with self.port() as port:
644
plugin = manager.NeutronManager.get_plugin()
645
binding = ml2_db.get_locked_port_and_binding(self.context.session,
646
port['port']['id'])[1]
647
binding['host'] = 'test'
648
mech_context = driver_context.PortContext(
649
plugin, self.context, port['port'],
650
plugin.get_network(self.context, port['port']['network_id']),
652
with contextlib.nested(
653
mock.patch('neutron.plugins.ml2.plugin.'
654
'db.get_locked_port_and_binding',
655
return_value=(None, None)),
656
mock.patch('neutron.plugins.ml2.plugin.Ml2Plugin._make_port_dict')
657
) as (glpab_mock, mpd_mock):
658
plugin._bind_port_if_needed(mech_context)
659
# called during deletion to get port
660
self.assertTrue(glpab_mock.mock_calls)
661
# should have returned before calling _make_port_dict
662
self.assertFalse(mpd_mock.mock_calls)
664
def test_port_binding_profile_not_changed(self):
666
profile_arg = {portbindings.PROFILE: profile}
667
with self.port(arg_list=(portbindings.PROFILE,),
668
**profile_arg) as port:
669
self._check_port_binding_profile(port['port'], profile)
670
port_id = port['port']['id']
671
state_arg = {'admin_state_up': True}
672
port = self._update('ports', port_id,
673
{'port': state_arg})['port']
674
self._check_port_binding_profile(port, profile)
675
port = self._show('ports', port_id)['port']
676
self._check_port_binding_profile(port, profile)
678
def test_process_dvr_port_binding_update_router_id(self):
680
binding = models.DVRPortBinding(
683
router_id='old_router_id',
684
vif_type=portbindings.VIF_TYPE_OVS,
685
vnic_type=portbindings.VNIC_NORMAL,
686
status=constants.PORT_STATUS_DOWN)
687
plugin = manager.NeutronManager.get_plugin()
688
mock_network = {'id': 'net_id'}
689
mock_port = {'id': 'port_id'}
690
context = mock.Mock()
691
new_router_id = 'new_router'
692
attrs = {'device_id': new_router_id, portbindings.HOST_ID: host_id}
693
with mock.patch.object(plugin, '_update_port_dict_binding'):
694
with mock.patch.object(ml2_db, 'get_network_segments',
696
mech_context = driver_context.PortContext(
697
self, context, mock_port, mock_network, binding, None)
698
plugin._process_dvr_port_binding(mech_context, context, attrs)
699
self.assertEqual(new_router_id,
700
mech_context._binding.router_id)
701
self.assertEqual(host_id, mech_context._binding.host)
703
def test_update_dvr_port_binding_on_non_existent_port(self):
704
plugin = manager.NeutronManager.get_plugin()
707
'binding:host_id': 'foo_host',
709
with mock.patch.object(ml2_db, 'ensure_dvr_port_binding') as mock_dvr:
710
plugin.update_dvr_port_binding(
711
self.context, 'foo_port_id', {'port': port})
712
self.assertFalse(mock_dvr.called)
715
class TestMl2PortBindingNoSG(TestMl2PortBinding):
716
HAS_PORT_FILTER = False
718
FIREWALL_DRIVER = test_sg_rpc.FIREWALL_NOOP_DRIVER
721
class TestMl2PortBindingHost(Ml2PluginV2TestCase,
722
test_bindings.PortBindingsHostTestCaseMixin):
726
class TestMl2PortBindingVnicType(Ml2PluginV2TestCase,
727
test_bindings.PortBindingsVnicTestCaseMixin):
731
class TestMultiSegmentNetworks(Ml2PluginV2TestCase):
733
def setUp(self, plugin=None):
734
super(TestMultiSegmentNetworks, self).setUp()
736
def test_allocate_dynamic_segment(self):
737
data = {'network': {'name': 'net1',
738
'tenant_id': 'tenant_one'}}
739
network_req = self.new_create_request('networks', data)
740
network = self.deserialize(self.fmt,
741
network_req.get_response(self.api))
742
segment = {driver_api.NETWORK_TYPE: 'vlan',
743
driver_api.PHYSICAL_NETWORK: 'physnet1'}
744
network_id = network['network']['id']
745
self.driver.type_manager.allocate_dynamic_segment(
746
self.context.session, network_id, segment)
747
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
750
self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
751
self.assertEqual('physnet1',
752
dynamic_segment[driver_api.PHYSICAL_NETWORK])
753
self.assertTrue(dynamic_segment[driver_api.SEGMENTATION_ID] > 0)
754
segment2 = {driver_api.NETWORK_TYPE: 'vlan',
755
driver_api.SEGMENTATION_ID: 1234,
756
driver_api.PHYSICAL_NETWORK: 'physnet3'}
757
self.driver.type_manager.allocate_dynamic_segment(
758
self.context.session, network_id, segment2)
759
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
761
segmentation_id='1234')
762
self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
763
self.assertEqual('physnet3',
764
dynamic_segment[driver_api.PHYSICAL_NETWORK])
765
self.assertEqual(dynamic_segment[driver_api.SEGMENTATION_ID], 1234)
767
def test_allocate_dynamic_segment_multiple_physnets(self):
768
data = {'network': {'name': 'net1',
769
'tenant_id': 'tenant_one'}}
770
network_req = self.new_create_request('networks', data)
771
network = self.deserialize(self.fmt,
772
network_req.get_response(self.api))
773
segment = {driver_api.NETWORK_TYPE: 'vlan',
774
driver_api.PHYSICAL_NETWORK: 'physnet1'}
775
network_id = network['network']['id']
776
self.driver.type_manager.allocate_dynamic_segment(
777
self.context.session, network_id, segment)
778
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
781
self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
782
self.assertEqual('physnet1',
783
dynamic_segment[driver_api.PHYSICAL_NETWORK])
784
dynamic_segmentation_id = dynamic_segment[driver_api.SEGMENTATION_ID]
785
self.assertTrue(dynamic_segmentation_id > 0)
786
dynamic_segment1 = ml2_db.get_dynamic_segment(self.context.session,
789
dynamic_segment1_id = dynamic_segment1[driver_api.SEGMENTATION_ID]
790
self.assertEqual(dynamic_segmentation_id, dynamic_segment1_id)
791
segment2 = {driver_api.NETWORK_TYPE: 'vlan',
792
driver_api.PHYSICAL_NETWORK: 'physnet2'}
793
self.driver.type_manager.allocate_dynamic_segment(
794
self.context.session, network_id, segment2)
795
dynamic_segment2 = ml2_db.get_dynamic_segment(self.context.session,
798
dynamic_segmentation2_id = dynamic_segment2[driver_api.SEGMENTATION_ID]
799
self.assertNotEqual(dynamic_segmentation_id, dynamic_segmentation2_id)
801
def test_allocate_release_dynamic_segment(self):
802
data = {'network': {'name': 'net1',
803
'tenant_id': 'tenant_one'}}
804
network_req = self.new_create_request('networks', data)
805
network = self.deserialize(self.fmt,
806
network_req.get_response(self.api))
807
segment = {driver_api.NETWORK_TYPE: 'vlan',
808
driver_api.PHYSICAL_NETWORK: 'physnet1'}
809
network_id = network['network']['id']
810
self.driver.type_manager.allocate_dynamic_segment(
811
self.context.session, network_id, segment)
812
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
815
self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
816
self.assertEqual('physnet1',
817
dynamic_segment[driver_api.PHYSICAL_NETWORK])
818
dynamic_segmentation_id = dynamic_segment[driver_api.SEGMENTATION_ID]
819
self.assertTrue(dynamic_segmentation_id > 0)
820
self.driver.type_manager.release_dynamic_segment(
821
self.context.session, dynamic_segment[driver_api.ID])
822
self.assertIsNone(ml2_db.get_dynamic_segment(
823
self.context.session, network_id, 'physnet1'))
825
def test_create_network_provider(self):
826
data = {'network': {'name': 'net1',
827
pnet.NETWORK_TYPE: 'vlan',
828
pnet.PHYSICAL_NETWORK: 'physnet1',
829
pnet.SEGMENTATION_ID: 1,
830
'tenant_id': 'tenant_one'}}
831
network_req = self.new_create_request('networks', data)
832
network = self.deserialize(self.fmt,
833
network_req.get_response(self.api))
834
self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
835
self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
836
self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
837
self.assertNotIn(mpnet.SEGMENTS, network['network'])
839
def test_create_network_single_multiprovider(self):
840
data = {'network': {'name': 'net1',
842
[{pnet.NETWORK_TYPE: 'vlan',
843
pnet.PHYSICAL_NETWORK: 'physnet1',
844
pnet.SEGMENTATION_ID: 1}],
845
'tenant_id': 'tenant_one'}}
846
net_req = self.new_create_request('networks', data)
847
network = self.deserialize(self.fmt, net_req.get_response(self.api))
848
self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
849
self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
850
self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
851
self.assertNotIn(mpnet.SEGMENTS, network['network'])
853
# Tests get_network()
854
net_req = self.new_show_request('networks', network['network']['id'])
855
network = self.deserialize(self.fmt, net_req.get_response(self.api))
856
self.assertEqual('vlan', network['network'][pnet.NETWORK_TYPE])
857
self.assertEqual('physnet1', network['network'][pnet.PHYSICAL_NETWORK])
858
self.assertEqual(1, network['network'][pnet.SEGMENTATION_ID])
859
self.assertNotIn(mpnet.SEGMENTS, network['network'])
861
def test_create_network_multiprovider(self):
862
data = {'network': {'name': 'net1',
864
[{pnet.NETWORK_TYPE: 'vlan',
865
pnet.PHYSICAL_NETWORK: 'physnet1',
866
pnet.SEGMENTATION_ID: 1},
867
{pnet.NETWORK_TYPE: 'vlan',
868
pnet.PHYSICAL_NETWORK: 'physnet1',
869
pnet.SEGMENTATION_ID: 2}],
870
'tenant_id': 'tenant_one'}}
871
network_req = self.new_create_request('networks', data)
872
network = self.deserialize(self.fmt,
873
network_req.get_response(self.api))
874
segments = network['network'][mpnet.SEGMENTS]
875
for segment_index, segment in enumerate(data['network']
877
for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
878
pnet.SEGMENTATION_ID]:
879
self.assertEqual(segment.get(field),
880
segments[segment_index][field])
882
# Tests get_network()
883
net_req = self.new_show_request('networks', network['network']['id'])
884
network = self.deserialize(self.fmt, net_req.get_response(self.api))
885
segments = network['network'][mpnet.SEGMENTS]
886
for segment_index, segment in enumerate(data['network']
888
for field in [pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK,
889
pnet.SEGMENTATION_ID]:
890
self.assertEqual(segment.get(field),
891
segments[segment_index][field])
893
def test_create_network_with_provider_and_multiprovider_fail(self):
894
data = {'network': {'name': 'net1',
896
[{pnet.NETWORK_TYPE: 'vlan',
897
pnet.PHYSICAL_NETWORK: 'physnet1',
898
pnet.SEGMENTATION_ID: 1}],
899
pnet.NETWORK_TYPE: 'vlan',
900
pnet.PHYSICAL_NETWORK: 'physnet1',
901
pnet.SEGMENTATION_ID: 1,
902
'tenant_id': 'tenant_one'}}
904
network_req = self.new_create_request('networks', data)
905
res = network_req.get_response(self.api)
906
self.assertEqual(400, res.status_int)
908
def test_create_network_duplicate_full_segments(self):
909
data = {'network': {'name': 'net1',
911
[{pnet.NETWORK_TYPE: 'vlan',
912
pnet.PHYSICAL_NETWORK: 'physnet1',
913
pnet.SEGMENTATION_ID: 1},
914
{pnet.NETWORK_TYPE: 'vlan',
915
pnet.PHYSICAL_NETWORK: 'physnet1',
916
pnet.SEGMENTATION_ID: 1}],
917
'tenant_id': 'tenant_one'}}
918
network_req = self.new_create_request('networks', data)
919
res = network_req.get_response(self.api)
920
self.assertEqual(400, res.status_int)
922
def test_create_network_duplicate_partial_segments(self):
923
data = {'network': {'name': 'net1',
925
[{pnet.NETWORK_TYPE: 'vlan',
926
pnet.PHYSICAL_NETWORK: 'physnet1'},
927
{pnet.NETWORK_TYPE: 'vlan',
928
pnet.PHYSICAL_NETWORK: 'physnet1'}],
929
'tenant_id': 'tenant_one'}}
930
network_req = self.new_create_request('networks', data)
931
res = network_req.get_response(self.api)
932
self.assertEqual(201, res.status_int)
934
def test_release_network_segments(self):
935
data = {'network': {'name': 'net1',
936
'admin_state_up': True,
938
pnet.NETWORK_TYPE: 'vlan',
939
pnet.PHYSICAL_NETWORK: 'physnet1',
940
pnet.SEGMENTATION_ID: 1,
941
'tenant_id': 'tenant_one'}}
942
network_req = self.new_create_request('networks', data)
943
res = network_req.get_response(self.api)
944
network = self.deserialize(self.fmt, res)
945
network_id = network['network']['id']
946
segment = {driver_api.NETWORK_TYPE: 'vlan',
947
driver_api.PHYSICAL_NETWORK: 'physnet2'}
948
self.driver.type_manager.allocate_dynamic_segment(
949
self.context.session, network_id, segment)
950
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
953
self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
954
self.assertEqual('physnet2',
955
dynamic_segment[driver_api.PHYSICAL_NETWORK])
956
self.assertTrue(dynamic_segment[driver_api.SEGMENTATION_ID] > 0)
958
with mock.patch.object(type_vlan.VlanTypeDriver,
959
'release_segment') as rs:
960
req = self.new_delete_request('networks', network_id)
961
res = req.get_response(self.api)
962
self.assertEqual(2, rs.call_count)
963
self.assertEqual(ml2_db.get_network_segments(
964
self.context.session, network_id), [])
965
self.assertIsNone(ml2_db.get_dynamic_segment(
966
self.context.session, network_id, 'physnet2'))
968
def test_release_segment_no_type_driver(self):
969
data = {'network': {'name': 'net1',
970
'admin_state_up': True,
972
pnet.NETWORK_TYPE: 'vlan',
973
pnet.PHYSICAL_NETWORK: 'physnet1',
974
pnet.SEGMENTATION_ID: 1,
975
'tenant_id': 'tenant_one'}}
976
network_req = self.new_create_request('networks', data)
977
res = network_req.get_response(self.api)
978
network = self.deserialize(self.fmt, res)
979
network_id = network['network']['id']
981
segment = {driver_api.NETWORK_TYPE: 'faketype',
982
driver_api.PHYSICAL_NETWORK: 'physnet1',
984
with mock.patch('neutron.plugins.ml2.managers.LOG') as log:
985
with mock.patch('neutron.plugins.ml2.managers.db') as db:
986
db.get_network_segments.return_value = (segment,)
987
self.driver.type_manager.release_network_segments(
988
self.context.session, network_id)
990
log.error.assert_called_once_with(
991
"Failed to release segment '%s' because "
992
"network type is not supported.", segment)
994
def test_create_provider_fail(self):
995
segment = {pnet.NETWORK_TYPE: None,
996
pnet.PHYSICAL_NETWORK: 'phys_net',
997
pnet.SEGMENTATION_ID: None}
998
with testtools.ExpectedException(exc.InvalidInput):
999
self.driver.type_manager._process_provider_create(segment)
1001
def test_create_network_plugin(self):
1002
data = {'network': {'name': 'net1',
1003
'admin_state_up': True,
1005
pnet.NETWORK_TYPE: 'vlan',
1006
pnet.PHYSICAL_NETWORK: 'physnet1',
1007
pnet.SEGMENTATION_ID: 1,
1008
'tenant_id': 'tenant_one'}}
1010
def raise_mechanism_exc(*args, **kwargs):
1011
raise ml2_exc.MechanismDriverError(
1012
method='create_network_postcommit')
1014
with mock.patch('neutron.plugins.ml2.managers.MechanismManager.'
1015
'create_network_precommit', new=raise_mechanism_exc):
1016
with testtools.ExpectedException(ml2_exc.MechanismDriverError):
1017
self.driver.create_network(self.context, data)
1019
def test_extend_dictionary_no_segments(self):
1020
network = dict(name='net_no_segment', id='5', tenant_id='tenant_one')
1021
self.driver.type_manager.extend_network_dict_provider(self.context,
1023
self.assertIsNone(network[pnet.NETWORK_TYPE])
1024
self.assertIsNone(network[pnet.PHYSICAL_NETWORK])
1025
self.assertIsNone(network[pnet.SEGMENTATION_ID])
1028
class TestMl2AllowedAddressPairs(Ml2PluginV2TestCase,
1029
test_pair.TestAllowedAddressPairs):
1030
def setUp(self, plugin=None):
1031
super(test_pair.TestAllowedAddressPairs, self).setUp(
1035
class DHCPOptsTestCase(test_dhcpopts.TestExtraDhcpOpt):
1037
def setUp(self, plugin=None):
1038
super(test_dhcpopts.ExtraDhcpOptDBTestCase, self).setUp(
1042
class Ml2PluginV2FaultyDriverTestCase(test_plugin.NeutronDbPluginV2TestCase):
1045
# Enable the test mechanism driver to ensure that
1046
# we can successfully call through to all mechanism
1048
config.cfg.CONF.set_override('mechanism_drivers',
1051
super(Ml2PluginV2FaultyDriverTestCase, self).setUp(PLUGIN_NAME)
1052
self.port_create_status = 'DOWN'
1055
class TestFaultyMechansimDriver(Ml2PluginV2FaultyDriverTestCase):
1057
def test_create_network_faulty(self):
1059
with mock.patch.object(mech_test.TestMechanismDriver,
1060
'create_network_postcommit',
1061
side_effect=ml2_exc.MechanismDriverError):
1062
tenant_id = str(uuid.uuid4())
1063
data = {'network': {'name': 'net1',
1064
'tenant_id': tenant_id}}
1065
req = self.new_create_request('networks', data)
1066
res = req.get_response(self.api)
1067
self.assertEqual(500, res.status_int)
1068
error = self.deserialize(self.fmt, res)
1069
self.assertEqual('MechanismDriverError',
1070
error['NeutronError']['type'])
1071
query_params = "tenant_id=%s" % tenant_id
1072
nets = self._list('networks', query_params=query_params)
1073
self.assertFalse(nets['networks'])
1075
def test_delete_network_faulty(self):
1077
with mock.patch.object(mech_test.TestMechanismDriver,
1078
'delete_network_postcommit',
1079
side_effect=ml2_exc.MechanismDriverError):
1080
with mock.patch.object(mech_logger.LoggerMechanismDriver,
1081
'delete_network_postcommit') as dnp:
1083
data = {'network': {'name': 'net1',
1084
'tenant_id': 'tenant_one'}}
1085
network_req = self.new_create_request('networks', data)
1086
network_res = network_req.get_response(self.api)
1087
self.assertEqual(201, network_res.status_int)
1088
network = self.deserialize(self.fmt, network_res)
1089
net_id = network['network']['id']
1090
req = self.new_delete_request('networks', net_id)
1091
res = req.get_response(self.api)
1092
self.assertEqual(204, res.status_int)
1093
# Test if other mechanism driver was called
1094
self.assertTrue(dnp.called)
1095
self._show('networks', net_id,
1096
expected_code=webob.exc.HTTPNotFound.code)
1098
def test_update_network_faulty(self):
1100
with mock.patch.object(mech_test.TestMechanismDriver,
1101
'update_network_postcommit',
1102
side_effect=ml2_exc.MechanismDriverError):
1103
with mock.patch.object(mech_logger.LoggerMechanismDriver,
1104
'update_network_postcommit') as unp:
1106
data = {'network': {'name': 'net1',
1107
'tenant_id': 'tenant_one'}}
1108
network_req = self.new_create_request('networks', data)
1109
network_res = network_req.get_response(self.api)
1110
self.assertEqual(201, network_res.status_int)
1111
network = self.deserialize(self.fmt, network_res)
1112
net_id = network['network']['id']
1114
new_name = 'a_brand_new_name'
1115
data = {'network': {'name': new_name}}
1116
req = self.new_update_request('networks', data, net_id)
1117
res = req.get_response(self.api)
1118
self.assertEqual(500, res.status_int)
1119
error = self.deserialize(self.fmt, res)
1120
self.assertEqual('MechanismDriverError',
1121
error['NeutronError']['type'])
1122
# Test if other mechanism driver was called
1123
self.assertTrue(unp.called)
1124
net = self._show('networks', net_id)
1125
self.assertEqual(new_name, net['network']['name'])
1127
self._delete('networks', net_id)
1129
def test_create_subnet_faulty(self):
1131
with mock.patch.object(mech_test.TestMechanismDriver,
1132
'create_subnet_postcommit',
1133
side_effect=ml2_exc.MechanismDriverError):
1135
with self.network() as network:
1136
net_id = network['network']['id']
1137
data = {'subnet': {'network_id': net_id,
1138
'cidr': '10.0.20.0/24',
1142
network['network']['tenant_id'],
1143
'gateway_ip': '10.0.20.1'}}
1144
req = self.new_create_request('subnets', data)
1145
res = req.get_response(self.api)
1146
self.assertEqual(500, res.status_int)
1147
error = self.deserialize(self.fmt, res)
1148
self.assertEqual('MechanismDriverError',
1149
error['NeutronError']['type'])
1150
query_params = "network_id=%s" % net_id
1151
subnets = self._list('subnets', query_params=query_params)
1152
self.assertFalse(subnets['subnets'])
1154
def test_delete_subnet_faulty(self):
1156
with mock.patch.object(mech_test.TestMechanismDriver,
1157
'delete_subnet_postcommit',
1158
side_effect=ml2_exc.MechanismDriverError):
1159
with mock.patch.object(mech_logger.LoggerMechanismDriver,
1160
'delete_subnet_postcommit') as dsp:
1162
with self.network() as network:
1163
data = {'subnet': {'network_id':
1164
network['network']['id'],
1165
'cidr': '10.0.20.0/24',
1169
network['network']['tenant_id'],
1170
'gateway_ip': '10.0.20.1'}}
1171
subnet_req = self.new_create_request('subnets', data)
1172
subnet_res = subnet_req.get_response(self.api)
1173
self.assertEqual(201, subnet_res.status_int)
1174
subnet = self.deserialize(self.fmt, subnet_res)
1175
subnet_id = subnet['subnet']['id']
1177
req = self.new_delete_request('subnets', subnet_id)
1178
res = req.get_response(self.api)
1179
self.assertEqual(204, res.status_int)
1180
# Test if other mechanism driver was called
1181
self.assertTrue(dsp.called)
1182
self._show('subnets', subnet_id,
1183
expected_code=webob.exc.HTTPNotFound.code)
1185
def test_update_subnet_faulty(self):
1187
with mock.patch.object(mech_test.TestMechanismDriver,
1188
'update_subnet_postcommit',
1189
side_effect=ml2_exc.MechanismDriverError):
1190
with mock.patch.object(mech_logger.LoggerMechanismDriver,
1191
'update_subnet_postcommit') as usp:
1193
with self.network() as network:
1194
data = {'subnet': {'network_id':
1195
network['network']['id'],
1196
'cidr': '10.0.20.0/24',
1200
network['network']['tenant_id'],
1201
'gateway_ip': '10.0.20.1'}}
1202
subnet_req = self.new_create_request('subnets', data)
1203
subnet_res = subnet_req.get_response(self.api)
1204
self.assertEqual(201, subnet_res.status_int)
1205
subnet = self.deserialize(self.fmt, subnet_res)
1206
subnet_id = subnet['subnet']['id']
1207
new_name = 'a_brand_new_name'
1208
data = {'subnet': {'name': new_name}}
1209
req = self.new_update_request('subnets', data, subnet_id)
1210
res = req.get_response(self.api)
1211
self.assertEqual(500, res.status_int)
1212
error = self.deserialize(self.fmt, res)
1213
self.assertEqual('MechanismDriverError',
1214
error['NeutronError']['type'])
1215
# Test if other mechanism driver was called
1216
self.assertTrue(usp.called)
1217
subnet = self._show('subnets', subnet_id)
1218
self.assertEqual(new_name, subnet['subnet']['name'])
1220
self._delete('subnets', subnet['subnet']['id'])
1222
def test_create_port_faulty(self):
1224
with mock.patch.object(mech_test.TestMechanismDriver,
1225
'create_port_postcommit',
1226
side_effect=ml2_exc.MechanismDriverError):
1228
with self.network() as network:
1229
net_id = network['network']['id']
1230
data = {'port': {'network_id': net_id,
1232
network['network']['tenant_id'],
1234
'admin_state_up': 1,
1236
req = self.new_create_request('ports', data)
1237
res = req.get_response(self.api)
1238
self.assertEqual(500, res.status_int)
1239
error = self.deserialize(self.fmt, res)
1240
self.assertEqual('MechanismDriverError',
1241
error['NeutronError']['type'])
1242
query_params = "network_id=%s" % net_id
1243
ports = self._list('ports', query_params=query_params)
1244
self.assertFalse(ports['ports'])
1246
def test_update_port_faulty(self):
1248
with mock.patch.object(mech_test.TestMechanismDriver,
1249
'update_port_postcommit',
1250
side_effect=ml2_exc.MechanismDriverError):
1251
with mock.patch.object(mech_logger.LoggerMechanismDriver,
1252
'update_port_postcommit') as upp:
1254
with self.network() as network:
1255
data = {'port': {'network_id': network['network']['id'],
1257
network['network']['tenant_id'],
1259
'admin_state_up': 1,
1261
port_req = self.new_create_request('ports', data)
1262
port_res = port_req.get_response(self.api)
1263
self.assertEqual(201, port_res.status_int)
1264
port = self.deserialize(self.fmt, port_res)
1265
port_id = port['port']['id']
1267
new_name = 'a_brand_new_name'
1268
data = {'port': {'name': new_name}}
1269
req = self.new_update_request('ports', data, port_id)
1270
res = req.get_response(self.api)
1271
self.assertEqual(500, res.status_int)
1272
error = self.deserialize(self.fmt, res)
1273
self.assertEqual('MechanismDriverError',
1274
error['NeutronError']['type'])
1275
# Test if other mechanism driver was called
1276
self.assertTrue(upp.called)
1277
port = self._show('ports', port_id)
1278
self.assertEqual(new_name, port['port']['name'])
1280
self._delete('ports', port['port']['id'])
1283
class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
1285
super(TestMl2PluginCreateUpdateDeletePort, self).setUp()
1286
self.context = mock.MagicMock()
1287
self.notify_p = mock.patch('neutron.callbacks.registry.notify')
1288
self.notify = self.notify_p.start()
1290
def _ensure_transaction_is_closed(self):
1291
transaction = self.context.session.begin(subtransactions=True)
1292
enter = transaction.__enter__.call_count
1293
exit = transaction.__exit__.call_count
1294
self.assertEqual(enter, exit)
1296
def _create_plugin_for_create_update_port(self, new_host_port):
1297
plugin = ml2_plugin.Ml2Plugin()
1298
plugin.extension_manager = mock.Mock()
1299
plugin.type_manager = mock.Mock()
1300
plugin.mechanism_manager = mock.Mock()
1301
plugin.notifier = mock.Mock()
1302
plugin._get_host_port_if_changed = mock.Mock(
1303
return_value=new_host_port)
1304
plugin._check_mac_update_allowed = mock.Mock(return_value=True)
1306
self.notify.side_effect = (
1307
lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
1311
def test_create_port_rpc_outside_transaction(self):
1312
with contextlib.nested(
1313
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
1314
mock.patch.object(base_plugin.NeutronDbPluginV2, 'create_port'),
1315
) as (init, super_create_port):
1316
init.return_value = None
1318
new_host_port = mock.Mock()
1319
plugin = self._create_plugin_for_create_update_port(new_host_port)
1321
plugin.create_port(self.context, mock.MagicMock())
1323
kwargs = {'context': self.context, 'port': new_host_port}
1324
self.notify.assert_called_once_with('port', 'after_create',
1327
def test_update_port_rpc_outside_transaction(self):
1328
with contextlib.nested(
1329
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__'),
1330
mock.patch.object(base_plugin.NeutronDbPluginV2, 'update_port'),
1331
) as (init, super_update_port):
1332
init.return_value = None
1333
new_host_port = mock.Mock()
1334
plugin = self._create_plugin_for_create_update_port(new_host_port)
1336
plugin.update_port(self.context, 'fake_id', mock.MagicMock())
1339
'context': self.context,
1340
'port': new_host_port,
1341
'mac_address_updated': True,
1343
self.notify.assert_called_once_with('port', 'after_update',
1346
def test_notify_outside_of_delete_transaction(self):
1347
self.notify.side_effect = (
1348
lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
1349
l3plugin = mock.Mock()
1350
l3plugin.supported_extension_aliases = [
1351
'router', constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
1352
constants.L3_DISTRIBUTED_EXT_ALIAS
1354
with contextlib.nested(
1355
mock.patch.object(ml2_plugin.Ml2Plugin, '__init__',
1357
mock.patch.object(manager.NeutronManager,
1358
'get_service_plugins',
1359
return_value={'L3_ROUTER_NAT': l3plugin}),
1361
plugin = self._create_plugin_for_create_update_port(mock.Mock())
1362
# deleting the port will call registry.notify, which will
1363
# run the transaction balancing function defined in this test
1364
plugin.delete_port(self.context, 'fake_id')
1365
self.assertTrue(self.notify.call_count)