78
81
conf.set_override('ovs_integration_bridge', br_int.br_name)
79
82
conf.set_override('external_network_bridge', br_ex.br_name)
81
temp_dir = self.useFixture(fixtures.TempDir()).path
82
conf.set_override('state_path', temp_dir)
84
temp_dir = self.get_new_temp_dir()
85
get_temp_file_path = functools.partial(self.get_temp_file_path,
87
conf.set_override('state_path', temp_dir.path)
83
88
conf.set_override('metadata_proxy_socket',
84
'%s/metadata_proxy' % temp_dir)
89
get_temp_file_path('metadata_proxy'))
85
90
conf.set_override('ha_confs_path',
86
'%s/ha_confs' % temp_dir)
91
get_temp_file_path('ha_confs'))
87
92
conf.set_override('external_pids',
88
'%s/external/pids' % temp_dir)
93
get_temp_file_path('external/pids'))
89
94
conf.set_override('host', host)
90
95
agent = l3_test_agent.TestL3NATAgent(host, conf)
91
mock.patch.object(agent, '_arping').start()
96
mock.patch.object(ip_lib, 'send_gratuitous_arp').start()
95
100
def generate_router_info(self, enable_ha):
96
101
return test_l3_agent.prepare_router_data(enable_snat=True,
97
102
enable_floating_ip=True,
100
106
def manage_router(self, agent, router):
101
107
self.addCleanup(self._delete_router, agent, router['id'])
192
205
'internal_device_name': internal_device_name,
193
206
'internal_device_cidr': internal_device_cidr,
194
207
'floating_ip_cidr': floating_ip_cidr,
195
'default_gateway_ip': default_gateway_ip
208
'default_gateway_ip': default_gateway_ip,
209
'int_port_ipv6': int_port_ipv6,
210
'ex_port_ipv6': ex_port_ipv6
213
def _get_rule(self, iptables_manager, table, chain, predicate):
214
rules = iptables_manager.get_chain(table, chain)
215
result = next(rule for rule in rules if predicate(rule))
218
def _assert_router_does_not_exist(self, router):
219
# If the namespace assertion succeeds
220
# then the devices and iptable rules have also been deleted,
221
# so there's no need to check that explicitly.
222
self.assertFalse(self._namespace_exists(router.ns_name))
223
self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
225
def _assert_snat_chains(self, router):
226
self.assertFalse(router.iptables_manager.is_chain_empty(
228
self.assertFalse(router.iptables_manager.is_chain_empty(
229
'nat', 'POSTROUTING'))
231
def _assert_floating_ip_chains(self, router):
232
self.assertFalse(router.iptables_manager.is_chain_empty(
233
'nat', 'float-snat'))
235
def _assert_metadata_chains(self, router):
236
metadata_port_filter = lambda rule: (
237
str(self.agent.conf.metadata_port) in rule.rule)
238
self.assertTrue(self._get_rule(router.iptables_manager,
241
metadata_port_filter))
242
self.assertTrue(self._get_rule(router.iptables_manager,
245
metadata_port_filter))
247
def _assert_internal_devices(self, router):
248
internal_devices = router.router[l3_constants.INTERFACE_KEY]
249
self.assertTrue(len(internal_devices))
250
for device in internal_devices:
251
self.assertTrue(self.device_exists_with_ip_mac(
252
device, self.agent.get_internal_device_name, router.ns_name))
254
def _assert_extra_routes(self, router):
255
routes = ip_lib.get_routing_table(self.root_helper, router.ns_name)
256
routes = [{'nexthop': route['nexthop'],
257
'destination': route['destination']} for route in routes]
259
for extra_route in router.router['routes']:
260
self.assertIn(extra_route, routes)
199
263
class L3AgentTestCase(L3AgentTestFramework):
200
264
def test_observer_notifications_legacy_router(self):
334
403
external_port['mac_address'],
335
404
router.ns_name, self.root_helper))
337
def _assert_snat_chains(self, router):
338
self.assertFalse(router.iptables_manager.is_chain_empty(
340
self.assertFalse(router.iptables_manager.is_chain_empty(
341
'nat', 'POSTROUTING'))
343
def _assert_floating_ip_chains(self, router):
344
self.assertFalse(router.iptables_manager.is_chain_empty(
345
'nat', 'float-snat'))
347
def _assert_router_does_not_exist(self, router):
348
# If the namespace assertion succeeds
349
# then the devices and iptable rules have also been deleted,
350
# so there's no need to check that explicitly.
351
self.assertFalse(self._namespace_exists(router))
352
self.assertFalse(self._metadata_proxy_exists(self.agent.conf, router))
354
406
def _assert_ha_device(self, router):
355
407
self.assertTrue(self.device_exists_with_ip_mac(
356
408
router.router[l3_constants.HA_INTERFACE_KEY],
357
self.agent.get_ha_device_name, router.ns_name))
409
router.get_ha_device_name, router.ns_name))
411
def _assert_no_ip_addresses_on_interface(self, router, interface):
412
device = ip_lib.IPDevice(interface, self.root_helper, router.ns_name)
413
self.assertEqual([], device.addr.list())
360
416
class L3HATestFramework(L3AgentTestFramework):
454
510
# Check status code
455
511
firstline = raw_headers.splitlines()[0]
456
512
self.assertIn(str(webob.exc.HTTPOk.code), firstline.split())
515
class TestDvrRouter(L3AgentTestFramework):
516
def test_dvr_router_lifecycle_without_ha_without_snat_with_fips(self):
517
self._dvr_router_lifecycle(enable_ha=False, enable_snat=False)
519
def test_dvr_router_lifecycle_without_ha_with_snat_with_fips(self):
520
self._dvr_router_lifecycle(enable_ha=False, enable_snat=True)
522
def _dvr_router_lifecycle(self, enable_ha=False, enable_snat=False):
523
'''Test dvr router lifecycle
525
:param enable_ha: sets the ha value for the router.
526
:param enable_snat: the value of enable_snat is used
527
to set the agent_mode.
530
# The value of agent_mode can be dvr, dvr_snat, or legacy.
531
# Since by definition this is a dvr (distributed = true)
532
# only dvr and dvr_snat are applicable
533
self.agent.conf.agent_mode = 'dvr_snat' if enable_snat else 'dvr'
535
# We get the router info particular to a dvr router
536
router_info = self.generate_dvr_router_info(
537
enable_ha, enable_snat)
539
# We need to mock the get_agent_gateway_port return value
540
# because the whole L3PluginApi is mocked and we need the port
541
# gateway_port information before the l3_agent will create it.
542
# The port returned needs to have the same information as
543
# router_info['gw_port']
545
neutron_l3_agent.L3PluginApi.return_value.get_agent_gateway_port)
546
mocked_gw_port.return_value = router_info['gw_port']
548
# We also need to mock the get_external_network_id method to
549
# get the correct fip namespace.
550
mocked_ext_net_id = (
551
neutron_l3_agent.L3PluginApi.return_value.get_external_network_id)
552
mocked_ext_net_id.return_value = (
553
router_info['_floatingips'][0]['floating_network_id'])
555
# With all that set we can now ask the l3_agent to
556
# manage the router (create it, create namespaces,
557
# attach interfaces, etc...)
558
router = self.manage_router(self.agent, router_info)
560
self.assertTrue(self._namespace_exists(router.ns_name))
561
self.assertTrue(self._metadata_proxy_exists(self.agent.conf, router))
562
self._assert_internal_devices(router)
563
self._assert_dvr_external_device(router)
564
self._assert_dvr_gateway(router)
565
self._assert_dvr_floating_ips(router)
566
self._assert_snat_chains(router)
567
self._assert_floating_ip_chains(router)
568
self._assert_metadata_chains(router)
569
self._assert_extra_routes(router)
571
self._delete_router(self.agent, router.router_id)
572
self._assert_router_does_not_exist(router)
574
def generate_dvr_router_info(self, enable_ha=False, enable_snat=False):
575
router = test_l3_agent.prepare_router_data(
576
enable_snat=enable_snat,
577
enable_floating_ip=True,
579
internal_ports = router.get(l3_constants.INTERFACE_KEY, [])
580
router['distributed'] = True
581
router['gw_port_host'] = self.agent.conf.host
582
router['gw_port']['binding:host_id'] = self.agent.conf.host
583
floating_ip = router['_floatingips'][0]
584
floating_ip['floating_network_id'] = router['gw_port']['network_id']
585
floating_ip['host'] = self.agent.conf.host
586
floating_ip['port_id'] = internal_ports[0]['id']
587
floating_ip['status'] = 'ACTIVE'
592
self._add_snat_port_info_to_router(router, internal_ports)
595
def _add_snat_port_info_to_router(self, router, internal_ports):
596
# Add snat port information to the router
597
snat_port_list = router.get(l3_constants.SNAT_ROUTER_INTF_KEY, [])
598
if not snat_port_list and internal_ports:
599
# Get values from internal port
600
port = internal_ports[0]
601
fixed_ip = port['fixed_ips'][0]
602
snat_subnet = port['subnet']
603
port_ip = fixed_ip['ip_address']
604
# Pick an ip address which is not the same as port_ip
605
snat_ip = str(netaddr.IPAddress(port_ip) + 5)
606
# Add the info to router as the first snat port
607
# in the list of snat ports
608
router[l3_constants.SNAT_ROUTER_INTF_KEY] = [
610
{'cidr': snat_subnet['cidr'],
611
'gateway_ip': snat_subnet['gateway_ip'],
612
'id': fixed_ip['subnet_id']},
613
'network_id': port['network_id'],
614
'device_owner': 'network:router_centralized_snat',
615
'mac_address': 'fa:16:3e:80:8d:89',
616
'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'],
617
'ip_address': snat_ip}],
619
'device_id': _uuid()}
622
def _assert_dvr_external_device(self, router):
623
external_port = self.agent._get_ex_gw_port(router)
624
snat_ns_name = self.agent.get_snat_ns_name(router.router_id)
626
# if the agent is in dvr_snat mode, then we have to check
627
# that the correct ports and ip addresses exist in the
628
# snat_ns_name namespace
629
if self.agent.conf.agent_mode == 'dvr_snat':
630
self.assertTrue(self.device_exists_with_ip_mac(
631
external_port, self.agent.get_external_device_name,
633
# if the agent is in dvr mode then the snat_ns_name namespace
634
# should not be present at all:
635
elif self.agent.conf.agent_mode == 'dvr':
637
self._namespace_exists(snat_ns_name),
638
"namespace %s was found but agent is in dvr mode not dvr_snat"
639
% (str(snat_ns_name))
641
# if the agent is anything else the test is misconfigured
642
# we force a test failure with message
644
self.assertTrue(False, " agent not configured for dvr or dvr_snat")
646
def _assert_dvr_gateway(self, router):
647
gateway_expected_in_snat_namespace = (
648
self.agent.conf.agent_mode == 'dvr_snat'
650
if gateway_expected_in_snat_namespace:
651
self._assert_dvr_snat_gateway(router)
653
snat_namespace_should_not_exist = (
654
self.agent.conf.agent_mode == 'dvr'
656
if snat_namespace_should_not_exist:
657
self._assert_snat_namespace_does_not_exist(router)
659
def _assert_dvr_snat_gateway(self, router):
660
namespace = self.agent.get_snat_ns_name(router.router_id)
661
external_port = self.agent._get_ex_gw_port(router)
662
external_device_name = self.agent.get_external_device_name(
664
external_device = ip_lib.IPDevice(external_device_name,
668
external_device.route.get_gateway().get('gateway'))
669
expected_gateway = external_port['subnet']['gateway_ip']
670
self.assertEqual(expected_gateway, existing_gateway)
672
def _assert_snat_namespace_does_not_exist(self, router):
673
namespace = self.agent.get_snat_ns_name(router.router_id)
674
self.assertFalse(self._namespace_exists(namespace))
676
def _assert_dvr_floating_ips(self, router):
677
# in the fip namespace:
678
# Check that the fg-<port-id> (floatingip_agent_gateway)
679
# is created with the ip address of the external gateway port
680
floating_ips = router.router[l3_constants.FLOATINGIP_KEY]
681
self.assertTrue(floating_ips)
683
external_port = self.agent._get_ex_gw_port(router)
684
fip_ns = self.agent.get_fip_ns(floating_ips[0]['floating_network_id'])
685
fip_ns_name = fip_ns.get_name()
686
fg_port_created_succesfully = ip_lib.device_exists_with_ip_mac(
687
fip_ns.get_ext_device_name(external_port['id']),
688
external_port['ip_cidr'],
689
external_port['mac_address'],
690
fip_ns_name, self.root_helper)
691
self.assertTrue(fg_port_created_succesfully)
692
# Check fpr-router device has been created
693
device_name = fip_ns.get_int_device_name(router.router_id)
694
fpr_router_device_created_succesfully = ip_lib.device_exists(
695
device_name, self.root_helper, fip_ns_name)
696
self.assertTrue(fpr_router_device_created_succesfully)
698
# In the router namespace
699
# Check rfp-<router-id> is created correctly
700
for fip in floating_ips:
701
device_name = fip_ns.get_rtr_ext_device_name(router.router_id)
702
self.assertTrue(ip_lib.device_exists(
703
device_name, self.root_helper, router.ns_name))