146
149
get_instance_by_floating_ip_addr)
147
150
self.stubs.Set(compute_utils, "get_nw_info_for_instance",
148
151
stub_nw_info(self.stubs))
153
osapi_compute_extension=[
154
'nova.api.openstack.compute.contrib.select_extensions'],
155
osapi_compute_ext_list=['Floating_ips'])
150
157
fake_network.stub_out_nw_api_get_instance_nw_info(self.stubs,
151
158
spectacular=True)
198
205
self.assertEqual(res_dict, response)
207
def test_floating_ip_release_nonexisting(self):
208
def fake_get_floating_ip(*args, **kwargs):
209
raise exception.FloatingIpNotFound(id=id)
211
self.stubs.Set(network.api.API, "get_floating_ip",
212
fake_get_floating_ip)
214
req = fakes.HTTPRequest.blank('/v2/fake/os-floating-ips/9876')
215
req.method = 'DELETE'
216
res = req.get_response(fakes.wsgi_app(init_only=('os-floating-ips',)))
217
self.assertEqual(res.status_int, 404)
218
expected_msg = ('{"itemNotFound": {"message": "Floating ip not found '
219
'for id 9876", "code": 404}}')
220
self.assertEqual(res.body, expected_msg)
200
222
def test_floating_ip_show(self):
201
223
req = fakes.HTTPRequest.blank('/v2/fake/os-floating-ips/1')
202
224
res_dict = self.controller.show(req, 1)
215
237
req = fakes.HTTPRequest.blank('/v2/fake/os-floating-ips/9876')
217
self.assertRaises(webob.exc.HTTPNotFound,
218
self.controller.show, req, 9876)
239
res = req.get_response(fakes.wsgi_app(init_only=('os-floating-ips',)))
240
self.assertEqual(res.status_int, 404)
241
expected_msg = ('{"itemNotFound": {"message": "Floating ip not found '
242
'for id 9876", "code": 404}}')
243
self.assertEqual(res.body, expected_msg)
220
245
def test_show_associated_floating_ip(self):
221
246
def get_floating_ip(self, context, id):
307
332
rsp = self.manager._add_floating_ip(req, 'test_inst', body)
308
333
self.assertTrue(rsp.status_int == 202)
335
def test_associate_not_allocated_floating_ip_to_instance(self):
337
def fake_associate_floating_ip(self, context, instance,
338
floating_address, fixed_address,
339
affect_auto_assigned=False):
340
raise exception.NotAuthorized()
341
self.stubs.Set(network.api.API, "associate_floating_ip",
342
fake_associate_floating_ip)
343
floating_ip = '10.10.10.11'
344
body = dict(addFloatingIp=dict(address=floating_ip))
345
req = webob.Request.blank('/v2/fake/servers/test_inst/action')
347
req.body = jsonutils.dumps(body)
348
req.headers["content-type"] = "application/json"
349
resp = req.get_response(fakes.wsgi_app(init_only=('servers',)))
350
res_dict = jsonutils.loads(resp.body)
351
self.assertEqual(resp.status_int, 404)
352
self.assertEqual(res_dict['itemNotFound']['message'],
353
"floating ip not found")
310
355
def test_floating_ip_disassociate(self):
311
356
def get_instance_by_floating_ip_addr(self, context, address):
312
357
if address == '10.10.10.10':
328
373
rsp = self.manager._remove_floating_ip(req, 'test_inst', body)
329
374
self.assertTrue(rsp.status_int == 404)
376
def test_floating_ip_associate_non_existent_ip(self):
377
def fake_network_api_associate(self, context, instance,
378
floating_address=None,
380
floating_ips = ["10.10.10.10", "10.10.10.11"]
381
if floating_address not in floating_ips:
382
raise exception.FloatingIpNotFoundForAddress()
384
self.stubs.Set(network.api.API, "associate_floating_ip",
385
fake_network_api_associate)
387
body = dict(addFloatingIp=dict(address='1.1.1.1'))
388
req = fakes.HTTPRequest.blank('/v2/fake/servers/test_inst/action')
389
self.assertRaises(webob.exc.HTTPNotFound,
390
self.manager._add_floating_ip,
391
req, 'test_inst', body)
393
def test_floating_ip_disassociate_non_existent_ip(self):
394
def network_api_get_floating_ip_by_address(self, context,
396
floating_ips = ["10.10.10.10", "10.10.10.11"]
397
if floating_address not in floating_ips:
398
raise exception.FloatingIpNotFoundForAddress()
400
self.stubs.Set(network.api.API, "get_floating_ip_by_address",
401
network_api_get_floating_ip_by_address)
403
body = dict(removeFloatingIp=dict(address='1.1.1.1'))
404
req = fakes.HTTPRequest.blank('/v2/fake/servers/test_inst/action')
405
self.assertRaises(webob.exc.HTTPNotFound,
406
self.manager._remove_floating_ip,
407
req, 'test_inst', body)
331
409
def test_floating_ip_disassociate_wrong_instance_uuid(self):
332
410
def get_instance_by_floating_ip_addr(self, context, address):
333
411
if address == '10.10.10.10':
357
435
rsp = self.manager._remove_floating_ip(req, 'test_inst', body)
358
436
self.assertTrue(rsp.status_int == 404)
438
def test_floating_ip_disassociate_auto_assigned(self):
439
def fake_get_floating_ip_addr_auto_assigned(self, context, address):
440
return {'id': 1, 'address': '10.10.10.10', 'pool': 'nova',
441
'fixed_ip_id': 10, 'auto_assigned': 1}
443
def get_instance_by_floating_ip_addr(self, context, address):
444
if address == '10.10.10.10':
447
def network_api_disassociate(self, context, instance,
449
raise exception.CannotDisassociateAutoAssignedFloatingIP()
451
self.stubs.Set(network.api.API, "get_floating_ip_by_address",
452
fake_get_floating_ip_addr_auto_assigned)
453
self.stubs.Set(network.api.API, "get_instance_id_by_floating_address",
454
get_instance_by_floating_ip_addr)
455
self.stubs.Set(network.api.API, "disassociate_floating_ip",
456
network_api_disassociate)
457
body = dict(removeFloatingIp=dict(address='10.10.10.10'))
458
req = fakes.HTTPRequest.blank('/v2/fake/servers/test_inst/action')
459
self.assertRaises(webob.exc.HTTPForbidden,
460
self.manager._remove_floating_ip,
461
req, 'test_inst', body)
463
def test_floating_ip_disassociate_map_authorization_exc(self):
464
def fake_get_floating_ip_addr_auto_assigned(self, context, address):
465
return {'id': 1, 'address': '10.10.10.10', 'pool': 'nova',
466
'fixed_ip_id': 10, 'auto_assigned': 1}
468
def get_instance_by_floating_ip_addr(self, context, address):
469
if address == '10.10.10.10':
472
def network_api_disassociate(self, context, instance, address):
473
raise exception.NotAuthorized()
475
self.stubs.Set(network.api.API, "get_floating_ip_by_address",
476
fake_get_floating_ip_addr_auto_assigned)
477
self.stubs.Set(network.api.API, "get_instance_id_by_floating_address",
478
get_instance_by_floating_ip_addr)
479
self.stubs.Set(network.api.API, "disassociate_floating_ip",
480
network_api_disassociate)
481
body = dict(removeFloatingIp=dict(address='10.10.10.10'))
482
req = fakes.HTTPRequest.blank('/v2/fake/servers/test_inst/action')
483
self.assertRaises(webob.exc.HTTPForbidden,
484
self.manager._remove_floating_ip,
485
req, 'test_inst', body)
360
487
# these are a few bad param tests
362
489
def test_bad_address_param_in_remove_floating_ip(self):