950
950
self.adminContext = q_context.get_admin_context()
951
951
self.dut = L3DvrScheduler()
953
def test__notify_l3_agent_update_port_no_removing_routers(self):
954
port_id = 'fake-port'
956
'context': self.adminContext,
960
'binding:host_id': 'vm-host',
961
'device_id': 'vm-id',
962
'device_owner': 'compute:None',
963
'mac_address': '02:04:05:17:18:19'
965
'mac_address_updated': True
968
plugin = manager.NeutronManager.get_plugin()
969
l3plugin = mock.Mock()
970
l3plugin.supported_extension_aliases = [
971
'router', constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
972
constants.L3_DISTRIBUTED_EXT_ALIAS
975
with mock.patch.object(manager.NeutronManager,
976
'get_service_plugins',
977
return_value={'L3_ROUTER_NAT': l3plugin}):
978
l3_dvrscheduler_db._notify_l3_agent_port_update(
979
'port', 'after_update', plugin, **kwargs)
981
self.assertFalse(l3plugin.dvr_vmarp_table_update.called)
982
self.assertFalse(l3plugin.dvr_update_router_addvm.called)
983
self.assertFalse(l3plugin.remove_router_from_l3_agent.called)
984
self.assertFalse(l3plugin.dvr_deletens_if_no_port.called)
986
def test__notify_l3_agent_update_port_removing_routers(self):
987
port_id = 'fake-port'
989
'context': self.adminContext,
992
'binding:host_id': None,
996
'mac_address_updated': False,
999
'binding:host_id': 'vm-host',
1000
'device_id': 'vm-id',
1001
'device_owner': 'compute:None'
1005
plugin = manager.NeutronManager.get_plugin()
1006
l3plugin = mock.Mock()
1007
l3plugin.supported_extension_aliases = [
1008
'router', constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
1009
constants.L3_DISTRIBUTED_EXT_ALIAS
1011
with mock.patch.object(manager.NeutronManager,
1012
'get_service_plugins',
1013
return_value={'L3_ROUTER_NAT': l3plugin}),\
1014
mock.patch.object(l3plugin, 'dvr_deletens_if_no_port',
1015
return_value=[{'agent_id': 'foo_agent',
1016
'router_id': 'foo_id'}]):
1017
l3_dvrscheduler_db._notify_l3_agent_port_update(
1018
'port', 'after_update', plugin, **kwargs)
1020
self.assertEqual(1, l3plugin.dvr_vmarp_table_update.call_count)
1021
l3plugin.dvr_vmarp_table_update.assert_called_once_with(
1022
self.adminContext, mock.ANY, 'del')
1024
self.assertFalse(l3plugin.dvr_update_router_addvm.called)
1025
l3plugin.remove_router_from_l3_agent.assert_called_once_with(
1026
self.adminContext, 'foo_agent', 'foo_id')
953
1028
def test__notify_port_delete(self):
954
1029
plugin = manager.NeutronManager.get_plugin()
955
1030
l3plugin = mock.Mock()
1108
1183
'my-subnet-id')
1109
1184
self.assertTrue(result)
1111
def test_dvr_serviced_vip_port_exists_on_subnet(self):
1186
def _test_dvr_serviced_vip_port_exists_on_subnet(self, device_owner):
1113
1188
'id': 'lbaas-vip-port1',
1114
1189
'device_id': 'vip-pool-id',
1115
1190
'status': 'ACTIVE',
1116
1191
'binding:host_id': 'thisHost',
1117
'device_owner': constants.DEVICE_OWNER_LOADBALANCER,
1192
'device_owner': device_owner,
1120
1195
'subnet_id': 'my-subnet-id',
1125
1200
self._test_dvr_serviced_port_exists_on_subnet(port=vip_port)
1202
def test_dvr_serviced_lbaas_vip_port_exists_on_subnet(self):
1203
self._test_dvr_serviced_vip_port_exists_on_subnet(
1204
device_owner=constants.DEVICE_OWNER_LOADBALANCER)
1206
def test_dvr_serviced_lbaasv2_vip_port_exists_on_subnet(self):
1207
self._test_dvr_serviced_vip_port_exists_on_subnet(
1208
device_owner=constants.DEVICE_OWNER_LOADBALANCERV2)
1127
1210
def _create_port(self, port_name, tenant_id, host, subnet_id, ip_address,
1128
1211
status='ACTIVE',
1129
1212
device_owner='compute:nova'):