621
642
# specify only first and last network
622
643
req_ids = [net['id'] for net in (self.nets3[0], self.nets3[-1])]
623
644
self._get_available_networks(prv_nets, pub_nets, req_ids)
646
def test_get_floating_ip_pools(self):
647
api = quantumapi.API()
648
search_opts = {'router:external': True}
649
self.moxed_client.list_networks(**search_opts).\
650
AndReturn({'networks': [self.fip_pool, self.fip_pool_nova]})
652
pools = api.get_floating_ip_pools(self.context)
653
expected = [{'name': self.fip_pool['name']},
654
{'name': self.fip_pool_nova['name']}]
655
self.assertEqual(expected, pools)
657
def _get_expected_fip_model(self, fip_data, idx=0):
658
expected = {'id': fip_data['id'],
659
'address': fip_data['floating_ip_address'],
660
'pool': self.fip_pool['name'],
661
'project_id': fip_data['tenant_id'],
662
'fixed_ip_id': fip_data['port_id'],
664
{'address': fip_data['fixed_ip_address']},
665
'instance': ({'uuid': self.port_data2[idx]['device_id']}
666
if fip_data['port_id']
670
def _test_get_floating_ip(self, fip_data, idx=0, by_address=False):
671
api = quantumapi.API()
672
fip_id = fip_data['id']
673
net_id = fip_data['floating_network_id']
674
address = fip_data['floating_ip_address']
676
self.moxed_client.list_floatingips(floating_ip_address=address).\
677
AndReturn({'floatingips': [fip_data]})
679
self.moxed_client.show_floatingip(fip_id).\
680
AndReturn({'floatingip': fip_data})
681
self.moxed_client.show_network(net_id).\
682
AndReturn({'network': self.fip_pool})
683
if fip_data['port_id']:
684
self.moxed_client.show_port(fip_data['port_id']).\
685
AndReturn({'port': self.port_data2[idx]})
688
expected = self._get_expected_fip_model(fip_data, idx)
691
fip = api.get_floating_ip_by_address(self.context, address)
693
fip = api.get_floating_ip(self.context, fip_id)
694
self.assertEqual(expected, fip)
696
def test_get_floating_ip_unassociated(self):
697
self._test_get_floating_ip(self.fip_unassociated, idx=0)
699
def test_get_floating_ip_associated(self):
700
self._test_get_floating_ip(self.fip_associated, idx=1)
702
def test_get_floating_ip_by_address(self):
703
self._test_get_floating_ip(self.fip_unassociated, idx=0,
706
def test_get_floating_ip_by_address_associated(self):
707
self._test_get_floating_ip(self.fip_associated, idx=1,
710
def test_get_floating_ip_by_address_not_found(self):
711
api = quantumapi.API()
712
address = self.fip_unassociated['floating_ip_address']
713
self.moxed_client.list_floatingips(floating_ip_address=address).\
714
AndReturn({'floatingips': []})
716
self.assertRaises(exception.FloatingIpNotFoundForAddress,
717
api.get_floating_ip_by_address,
718
self.context, address)
720
def test_get_floating_ip_by_address_multiple_found(self):
721
api = quantumapi.API()
722
address = self.fip_unassociated['floating_ip_address']
723
self.moxed_client.list_floatingips(floating_ip_address=address).\
724
AndReturn({'floatingips': [self.fip_unassociated] * 2})
726
self.assertRaises(exception.FloatingIpMultipleFoundForAddress,
727
api.get_floating_ip_by_address,
728
self.context, address)
730
def test_get_floating_ips_by_project(self):
731
api = quantumapi.API()
732
project_id = self.context.project_id
733
self.moxed_client.list_floatingips(tenant_id=project_id).\
734
AndReturn({'floatingips': [self.fip_unassociated,
735
self.fip_associated]})
736
search_opts = {'router:external': True}
737
self.moxed_client.list_networks(**search_opts).\
738
AndReturn({'networks': [self.fip_pool, self.fip_pool_nova]})
739
self.moxed_client.list_ports(tenant_id=project_id).\
740
AndReturn({'ports': self.port_data2})
743
expected = [self._get_expected_fip_model(self.fip_unassociated),
744
self._get_expected_fip_model(self.fip_associated, idx=1)]
745
fips = api.get_floating_ips_by_project(self.context)
746
self.assertEqual(expected, fips)
748
def _test_get_instance_id_by_floating_address(self, fip_data,
750
api = quantumapi.API()
751
address = fip_data['floating_ip_address']
752
self.moxed_client.list_floatingips(floating_ip_address=address).\
753
AndReturn({'floatingips': [fip_data]})
755
self.moxed_client.show_port(fip_data['port_id']).\
756
AndReturn({'port': self.port_data2[1]})
760
expected = self.port_data2[1]['device_id']
763
fip = api.get_instance_id_by_floating_address(self.context, address)
764
self.assertEqual(expected, fip)
766
def test_get_instance_id_by_floating_address(self):
767
self._test_get_instance_id_by_floating_address(self.fip_unassociated)
769
def test_get_instance_id_by_floating_address_associated(self):
770
self._test_get_instance_id_by_floating_address(self.fip_associated,
773
def test_allocate_floating_ip(self):
774
api = quantumapi.API()
775
pool_name = self.fip_pool['name']
776
pool_id = self.fip_pool['id']
777
search_opts = {'router:external': True,
780
self.moxed_client.list_networks(**search_opts).\
781
AndReturn({'networks': [self.fip_pool]})
782
self.moxed_client.create_floatingip(
783
{'floatingip': {'floating_network_id': pool_id}}).\
784
AndReturn({'floatingip': self.fip_unassociated})
786
fip = api.allocate_floating_ip(self.context, 'ext_net')
787
self.assertEqual(fip, self.fip_unassociated['floating_ip_address'])
789
def test_allocate_floating_ip_with_pool_id(self):
790
api = quantumapi.API()
791
pool_name = self.fip_pool['name']
792
pool_id = self.fip_pool['id']
793
search_opts = {'router:external': True,
796
self.moxed_client.list_networks(**search_opts).\
797
AndReturn({'networks': [self.fip_pool]})
798
self.moxed_client.create_floatingip(
799
{'floatingip': {'floating_network_id': pool_id}}).\
800
AndReturn({'floatingip': self.fip_unassociated})
802
fip = api.allocate_floating_ip(self.context, pool_id)
803
self.assertEqual(fip, self.fip_unassociated['floating_ip_address'])
805
def test_allocate_floating_ip_with_default_pool(self):
806
api = quantumapi.API()
807
pool_name = self.fip_pool_nova['name']
808
pool_id = self.fip_pool_nova['id']
809
search_opts = {'router:external': True,
812
self.moxed_client.list_networks(**search_opts).\
813
AndReturn({'networks': [self.fip_pool_nova]})
814
self.moxed_client.create_floatingip(
815
{'floatingip': {'floating_network_id': pool_id}}).\
816
AndReturn({'floatingip': self.fip_unassociated})
818
fip = api.allocate_floating_ip(self.context)
819
self.assertEqual(fip, self.fip_unassociated['floating_ip_address'])
821
def test_release_floating_ip(self):
822
api = quantumapi.API()
823
address = self.fip_unassociated['floating_ip_address']
824
fip_id = self.fip_unassociated['id']
826
self.moxed_client.list_floatingips(floating_ip_address=address).\
827
AndReturn({'floatingips': [self.fip_unassociated]})
828
self.moxed_client.delete_floatingip(fip_id)
830
api.release_floating_ip(self.context, address)
832
def test_release_floating_ip_associated(self):
833
api = quantumapi.API()
834
address = self.fip_associated['floating_ip_address']
835
fip_id = self.fip_associated['id']
837
self.moxed_client.list_floatingips(floating_ip_address=address).\
838
AndReturn({'floatingips': [self.fip_associated]})
840
self.assertRaises(exception.FloatingIpAssociated,
841
api.release_floating_ip, self.context, address)
843
def _setup_mock_for_refresh_cache(self, api):
844
nw_info = self.mox.CreateMock(model.NetworkInfo)
846
self.mox.StubOutWithMock(api, '_get_instance_nw_info')
847
api._get_instance_nw_info(mox.IgnoreArg(), self.instance).\
849
self.mox.StubOutWithMock(api.db, 'instance_info_cache_update')
850
api.db.instance_info_cache_update(mox.IgnoreArg(),
851
self.instance['uuid'],
854
def test_associate_floating_ip(self):
855
api = quantumapi.API()
856
address = self.fip_associated['floating_ip_address']
857
fixed_address = self.fip_associated['fixed_ip_address']
858
fip_id = self.fip_associated['id']
860
search_opts = {'device_owner': 'compute:nova',
861
'device_id': self.instance['uuid']}
862
self.moxed_client.list_ports(**search_opts).\
863
AndReturn({'ports': [self.port_data2[1]]})
864
self.moxed_client.list_floatingips(floating_ip_address=address).\
865
AndReturn({'floatingips': [self.fip_associated]})
866
self.moxed_client.update_floatingip(
867
fip_id, {'floatingip': {'port_id': self.fip_associated['port_id'],
868
'fixed_ip_address': fixed_address}})
869
self._setup_mock_for_refresh_cache(api)
872
api.associate_floating_ip(self.context, self.instance,
873
address, fixed_address)
875
def test_associate_floating_ip_not_found_fixed_ip(self):
876
api = quantumapi.API()
877
address = self.fip_associated['floating_ip_address']
878
fixed_address = self.fip_associated['fixed_ip_address']
879
fip_id = self.fip_associated['id']
881
search_opts = {'device_owner': 'compute:nova',
882
'device_id': self.instance['uuid']}
883
self.moxed_client.list_ports(**search_opts).\
884
AndReturn({'ports': [self.port_data2[0]]})
887
self.assertRaises(exception.FixedIpNotFoundForAddress,
888
api.associate_floating_ip, self.context,
889
self.instance, address, fixed_address)
891
def test_disassociate_floating_ip(self):
892
api = quantumapi.API()
893
address = self.fip_associated['floating_ip_address']
894
fip_id = self.fip_associated['id']
896
self.moxed_client.list_floatingips(floating_ip_address=address).\
897
AndReturn({'floatingips': [self.fip_associated]})
898
self.moxed_client.update_floatingip(
899
fip_id, {'floatingip': {'port_id': None}})
900
self._setup_mock_for_refresh_cache(api)
903
api.disassociate_floating_ip(self.context, self.instance, address)
906
class TestQuantumv2ModuleMethods(test.TestCase):
907
def test_ensure_requested_network_ordering_no_preference(self):
910
quantumapi._ensure_requested_network_ordering(
915
def test_ensure_requested_network_ordering_no_preference(self):
916
l = [{'id': 3}, {'id': 1}, {'id': 2}]
918
quantumapi._ensure_requested_network_ordering(
923
self.assertEqual(l, [{'id': 3}, {'id': 1}, {'id': 2}])
925
def test_ensure_requested_network_ordering_with_preference(self):
926
l = [{'id': 3}, {'id': 1}, {'id': 2}]
928
quantumapi._ensure_requested_network_ordering(
933
self.assertEqual(l, [{'id': 1}, {'id': 2}, {'id': 3}])