943
944
with self.port() as p:
944
945
private_sub = {'subnet': {'id':
945
946
p['port']['fixed_ips'][0]['subnet_id']}}
946
with self.floatingip_no_assoc(private_sub) as fip:
947
port_id = p['port']['id']
948
body = self._update('floatingips', fip['floatingip']['id'],
949
{'floatingip': {'port_id': port_id}})
950
self.assertEqual(body['floatingip']['port_id'], port_id)
952
body = self._update('floatingips', fip['floatingip']['id'],
953
{'floatingip': {'port_id': None}})
954
body = self._show('floatingips', fip['floatingip']['id'])
955
self.assertIsNone(body['floatingip']['port_id'])
956
self.assertIsNone(body['floatingip']['fixed_ip_address'])
947
plugin = manager.NeutronManager.get_plugin()
948
with mock.patch.object(plugin, 'notify_routers_updated') as notify:
949
with self.floatingip_no_assoc(private_sub) as fip:
950
port_id = p['port']['id']
951
body = self._update('floatingips', fip['floatingip']['id'],
952
{'floatingip': {'port_id': port_id}})
953
self.assertEqual(body['floatingip']['port_id'], port_id)
955
body = self._update('floatingips', fip['floatingip']['id'],
956
{'floatingip': {'port_id': None}})
957
body = self._show('floatingips', fip['floatingip']['id'])
958
self.assertIsNone(body['floatingip']['port_id'])
959
self.assertIsNone(body['floatingip']['fixed_ip_address'])
961
# check that notification was not requested
962
self.assertFalse(notify.called)
958
964
def test_create_router_maintenance_returns_503(self):
959
965
with self._create_l3_ext_network() as net:
1145
1151
res = req.get_response(self.ext_api)
1146
1152
self.assertEqual(res.status_int, 200)
1154
def _test_remove_router_interface_nsx_out_of_sync(self, unsync_action):
1155
# Create external network and subnet
1156
ext_net_id = self._create_network_and_subnet('1.1.1.0/24', True)[0]
1157
# Create internal network and subnet
1158
int_sub_id = self._create_network_and_subnet('10.0.0.0/24')[1]
1159
res = self._create_router('json', 'tenant')
1160
router = self.deserialize('json', res)
1161
# Set gateway and add interface to router (needed to generate NAT rule)
1162
req = self.new_update_request(
1164
{'router': {'external_gateway_info':
1165
{'network_id': ext_net_id}}},
1166
router['router']['id'])
1167
res = req.get_response(self.ext_api)
1168
self.assertEqual(res.status_int, 200)
1169
req = self.new_action_request(
1171
{'subnet_id': int_sub_id},
1172
router['router']['id'],
1173
"add_router_interface")
1174
res = req.get_response(self.ext_api)
1175
self.assertEqual(res.status_int, 200)
1177
req = self.new_action_request(
1179
{'subnet_id': int_sub_id},
1180
router['router']['id'],
1181
"remove_router_interface")
1182
res = req.get_response(self.ext_api)
1183
self.assertEqual(res.status_int, 200)
1148
1185
def test_remove_router_interface_not_in_nsx(self):
1149
# Create internal network and subnet
1150
int_sub_id = self._create_network_and_subnet('10.0.0.0/24')[1]
1151
res = self._create_router('json', 'tenant')
1152
router = self.deserialize('json', res)
1153
# Add interface to router (needed to generate NAT rule)
1154
req = self.new_action_request(
1156
{'subnet_id': int_sub_id},
1157
router['router']['id'],
1158
"add_router_interface")
1159
res = req.get_response(self.ext_api)
1160
self.assertEqual(res.status_int, 200)
1161
self.fc._fake_lrouter_dict.clear()
1162
req = self.new_action_request(
1164
{'subnet_id': int_sub_id},
1165
router['router']['id'],
1166
"remove_router_interface")
1167
res = req.get_response(self.ext_api)
1168
self.assertEqual(res.status_int, 200)
1187
def unsync_action():
1188
self.fc._fake_lrouter_dict.clear()
1189
self.fc._fake_lrouter_nat_dict.clear()
1191
self._test_remove_router_interface_nsx_out_of_sync(unsync_action)
1193
def test_remove_router_interface_nat_rule_not_in_nsx(self):
1194
self._test_remove_router_interface_nsx_out_of_sync(
1195
self.fc._fake_lrouter_nat_dict.clear)
1197
def test_remove_router_interface_duplicate_nat_rules_in_nsx(self):
1199
def unsync_action():
1200
# duplicate every entry in the nat rule dict
1201
for (_rule_id, rule) in self.fc._fake_lrouter_nat_dict.items():
1202
self.fc._fake_lrouter_nat_dict[uuid.uuid4()] = rule
1204
self._test_remove_router_interface_nsx_out_of_sync(unsync_action)
1170
1206
def test_update_router_not_in_nsx(self):
1171
1207
res = self._create_router('json', 'tenant')