~ubuntu-branches/ubuntu/utopic/keystone/utopic

« back to all changes in this revision

Viewing changes to tests/test_sql_upgrade.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Adam Gandelman, Yolanda Robla, Chuck Short, James Page
  • Date: 2013-07-19 09:25:36 UTC
  • mfrom: (1.1.35)
  • Revision ID: package-import@ubuntu.com-20130719092536-nkryufi44y40mesn
Tags: 1:2013.2~b2-0ubuntu1
[ Adam Gandelman ]
* debian/patches/*: Refresh
* debian/keystone.install: Install keystone-paste.ini to /etc/keystone.
  (LP: #1190038)

[ Yolanda Robla ]
* debian/tests: added autopkgtests.

[ Chuck Short ]
* New upstream release.

[ James Page ]
* d/control: Update VCS fields for new branch locations.

Show diffs side-by-side

added added

removed removed

Lines of Context:
32
32
from migrate.versioning import api as versioning_api
33
33
import sqlalchemy
34
34
 
 
35
from keystone import test
 
36
 
35
37
from keystone.common import sql
36
38
from keystone.common.sql import migration
37
39
from keystone import config
38
 
from keystone import test
39
40
 
40
41
import default_fixtures
41
42
 
50
51
        self.metadata = sqlalchemy.MetaData()
51
52
        self.metadata.bind = self.engine
52
53
 
 
54
    _config_file_list = [test.etcdir('keystone.conf.sample'),
 
55
                         test.testsdir('test_overrides.conf'),
 
56
                         test.testsdir('backend_sql.conf')]
 
57
 
 
58
    #override this to sepcify the complete list of configuration files
 
59
    def config_files(self):
 
60
        return self._config_file_list
 
61
 
53
62
    def setUp(self):
54
63
        super(SqlUpgradeTests, self).setUp()
55
 
        self.config([test.etcdir('keystone.conf.sample'),
56
 
                     test.testsdir('test_overrides.conf'),
57
 
                     test.testsdir('backend_sql.conf')])
 
64
 
 
65
        self.config(self.config_files())
58
66
        self.base = sql.Base()
59
67
 
60
68
        # create and share a single sqlalchemy engine for testing
498
506
 
499
507
    def test_downgrade_to_0(self):
500
508
        self.upgrade(self.max_version)
 
509
 
 
510
        if self.engine.name == 'mysql':
 
511
            self._mysql_check_all_tables_innodb()
 
512
 
501
513
        self.downgrade(0)
502
514
        for table_name in ["user", "token", "role", "user_tenant_membership",
503
515
                           "metadata"]:
589
601
        user_project_metadata_table = sqlalchemy.Table(
590
602
            'user_project_metadata', self.metadata, autoload=True)
591
603
 
592
 
        r = session.execute('select data from metadata where '
593
 
                            'user_id=:user and tenant_id=:tenant',
594
 
                            {'user': user['id'], 'tenant': project['id']})
 
604
        s = sqlalchemy.select([metadata_table.c.data]).where(
 
605
            (metadata_table.c.user_id == user['id']) &
 
606
            (metadata_table.c.tenant_id == project['id']))
 
607
        r = session.execute(s)
595
608
        test_project1 = json.loads(r.fetchone()['data'])
596
609
        self.assertEqual(len(test_project1['roles']), 1)
597
610
        self.assertIn(role['id'], test_project1['roles'])
598
611
 
599
612
        # Test user in project2 has role2
600
 
        r = session.execute('select data from metadata where '
601
 
                            'user_id=:user and tenant_id=:tenant',
602
 
                            {'user': user['id'], 'tenant': project2['id']})
 
613
        s = sqlalchemy.select([metadata_table.c.data]).where(
 
614
            (metadata_table.c.user_id == user['id']) &
 
615
            (metadata_table.c.tenant_id == project2['id']))
 
616
        r = session.execute(s)
603
617
        test_project2 = json.loads(r.fetchone()['data'])
604
618
        self.assertEqual(len(test_project2['roles']), 1)
605
619
        self.assertIn(role2['id'], test_project2['roles'])
607
621
        # Test for user in project has role in user_project_metadata
608
622
        # Migration 17 does not properly migrate this data, so this should
609
623
        # be None.
610
 
        r = session.execute('select data from user_project_metadata where '
611
 
                            'user_id=:user and project_id=:project',
612
 
                            {'user': user['id'], 'project': project['id']})
 
624
        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
 
625
            (user_project_metadata_table.c.user_id == user['id']) &
 
626
            (user_project_metadata_table.c.project_id == project['id']))
 
627
        r = session.execute(s)
613
628
        self.assertIsNone(r.fetchone())
614
629
 
615
630
        # Create a conflicting user-project in user_project_metadata with
630
645
        # The user-project pairs should have all roles from the previous
631
646
        # metadata table in addition to any roles currently in
632
647
        # user_project_metadata
633
 
        r = session.execute('select data from user_project_metadata where '
634
 
                            'user_id=:user and project_id=:project',
635
 
                            {'user': user['id'], 'project': project['id']})
 
648
        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
 
649
            (user_project_metadata_table.c.user_id == user['id']) &
 
650
            (user_project_metadata_table.c.project_id == project['id']))
 
651
        r = session.execute(s)
636
652
        role_ids = json.loads(r.fetchone()['data'])['roles']
637
653
        self.assertEqual(len(role_ids), 3)
638
654
        self.assertIn(CONF.member_role_id, role_ids)
641
657
 
642
658
        # pairs that only existed in old metadata table should be in
643
659
        # user_project_metadata
644
 
        r = session.execute('select data from user_project_metadata where '
645
 
                            'user_id=:user and project_id=:project',
646
 
                            {'user': user['id'], 'project': project2['id']})
 
660
        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
 
661
            (user_project_metadata_table.c.user_id == user['id']) &
 
662
            (user_project_metadata_table.c.project_id == project2['id']))
 
663
        r = session.execute(s)
647
664
        role_ids = json.loads(r.fetchone()['data'])['roles']
648
665
        self.assertEqual(len(role_ids), 2)
649
666
        self.assertIn(CONF.member_role_id, role_ids)
807
824
        self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id)
808
825
        self.assertEqual(ref.extra, '{}')
809
826
 
 
827
    def test_group_project_FK_fixup(self):
 
828
        # To create test data we must start before we broke in the
 
829
        # group_project_metadata table in 015.
 
830
        self.upgrade(14)
 
831
        session = self.Session()
 
832
 
 
833
        domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
 
834
        group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
 
835
        tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True)
 
836
        role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
 
837
        group_project_metadata_table = sqlalchemy.Table(
 
838
            'group_project_metadata', self.metadata, autoload=True)
 
839
 
 
840
        # Create a Domain
 
841
        domain = {'id': uuid.uuid4().hex,
 
842
                  'name': uuid.uuid4().hex,
 
843
                  'enabled': True}
 
844
        session.execute(domain_table.insert().values(domain))
 
845
 
 
846
        # Create two Tenants
 
847
        tenant = {'id': uuid.uuid4().hex,
 
848
                  'name': uuid.uuid4().hex,
 
849
                  'extra': "{}"}
 
850
        session.execute(tenant_table.insert().values(tenant))
 
851
 
 
852
        tenant1 = {'id': uuid.uuid4().hex,
 
853
                   'name': uuid.uuid4().hex,
 
854
                   'extra': "{}"}
 
855
        session.execute(tenant_table.insert().values(tenant1))
 
856
 
 
857
        # Create a Group
 
858
        group = {'id': uuid.uuid4().hex,
 
859
                 'name': uuid.uuid4().hex,
 
860
                 'domain_id': domain['id'],
 
861
                 'extra': json.dumps({})}
 
862
        session.execute(group_table.insert().values(group))
 
863
 
 
864
        # Create roles
 
865
        role_list = []
 
866
        for _ in range(2):
 
867
            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
 
868
            session.execute(role_table.insert().values(role))
 
869
            role_list.append(role)
 
870
 
 
871
        # Grant Role to User on Project
 
872
        role_grant = {'group_id': group['id'],
 
873
                      'project_id': tenant['id'],
 
874
                      'data': json.dumps({'roles': [role_list[0]['id']]})}
 
875
        session.execute(
 
876
            group_project_metadata_table.insert().values(role_grant))
 
877
 
 
878
        role_grant = {'group_id': group['id'],
 
879
                      'project_id': tenant1['id'],
 
880
                      'data': json.dumps({'roles': [role_list[1]['id']]})}
 
881
        session.execute(
 
882
            group_project_metadata_table.insert().values(role_grant))
 
883
 
 
884
        session.commit()
 
885
 
 
886
        # Now upgrade and fix up the FKs
 
887
        self.upgrade(28)
 
888
        self.assertTableExists('group_project_metadata')
 
889
        self.assertTableExists('project')
 
890
        self.assertTableDoesNotExist('tenant')
 
891
 
 
892
        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
 
893
            (group_project_metadata_table.c.group_id == group['id']) &
 
894
            (group_project_metadata_table.c.project_id == tenant['id']))
 
895
        r = session.execute(s)
 
896
        data = json.loads(r.fetchone()['data'])
 
897
        self.assertEqual(len(data['roles']), 1)
 
898
        self.assertIn(role_list[0]['id'], data['roles'])
 
899
 
 
900
        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
 
901
            (group_project_metadata_table.c.group_id == group['id']) &
 
902
            (group_project_metadata_table.c.project_id == tenant1['id']))
 
903
        r = session.execute(s)
 
904
        data = json.loads(r.fetchone()['data'])
 
905
        self.assertEqual(len(data['roles']), 1)
 
906
        self.assertIn(role_list[1]['id'], data['roles'])
 
907
 
 
908
        self.downgrade(27)
 
909
        self.assertTableExists('group_project_metadata')
 
910
        self.assertTableExists('project')
 
911
        self.assertTableDoesNotExist('tenant')
 
912
 
 
913
    def test_assignment_metadata_migration(self):
 
914
        self.upgrade(28)
 
915
        # Scaffolding
 
916
        session = self.Session()
 
917
 
 
918
        domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
 
919
        user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
 
920
        group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
 
921
        role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
 
922
        project_table = sqlalchemy.Table(
 
923
            'project', self.metadata, autoload=True)
 
924
        user_project_metadata_table = sqlalchemy.Table(
 
925
            'user_project_metadata', self.metadata, autoload=True)
 
926
        user_domain_metadata_table = sqlalchemy.Table(
 
927
            'user_domain_metadata', self.metadata, autoload=True)
 
928
        group_project_metadata_table = sqlalchemy.Table(
 
929
            'group_project_metadata', self.metadata, autoload=True)
 
930
        group_domain_metadata_table = sqlalchemy.Table(
 
931
            'group_domain_metadata', self.metadata, autoload=True)
 
932
 
 
933
        # Create a Domain
 
934
        domain = {'id': uuid.uuid4().hex,
 
935
                  'name': uuid.uuid4().hex,
 
936
                  'enabled': True}
 
937
        session.execute(domain_table.insert().values(domain))
 
938
 
 
939
        # Create anther Domain
 
940
        domain2 = {'id': uuid.uuid4().hex,
 
941
                   'name': uuid.uuid4().hex,
 
942
                   'enabled': True}
 
943
        session.execute(domain_table.insert().values(domain2))
 
944
 
 
945
        # Create a Project
 
946
        project = {'id': uuid.uuid4().hex,
 
947
                   'name': uuid.uuid4().hex,
 
948
                   'domain_id': domain['id'],
 
949
                   'extra': "{}"}
 
950
        session.execute(project_table.insert().values(project))
 
951
 
 
952
        # Create another Project
 
953
        project2 = {'id': uuid.uuid4().hex,
 
954
                    'name': uuid.uuid4().hex,
 
955
                    'domain_id': domain['id'],
 
956
                    'extra': "{}"}
 
957
        session.execute(project_table.insert().values(project2))
 
958
 
 
959
        # Create a User
 
960
        user = {'id': uuid.uuid4().hex,
 
961
                'name': uuid.uuid4().hex,
 
962
                'domain_id': domain['id'],
 
963
                'password': uuid.uuid4().hex,
 
964
                'enabled': True,
 
965
                'extra': json.dumps({})}
 
966
        session.execute(user_table.insert().values(user))
 
967
 
 
968
        # Create a Group
 
969
        group = {'id': uuid.uuid4().hex,
 
970
                 'name': uuid.uuid4().hex,
 
971
                 'domain_id': domain['id'],
 
972
                 'extra': json.dumps({})}
 
973
        session.execute(group_table.insert().values(group))
 
974
 
 
975
        # Create roles
 
976
        role_list = []
 
977
        for _ in range(7):
 
978
            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
 
979
            session.execute(role_table.insert().values(role))
 
980
            role_list.append(role)
 
981
 
 
982
        # Grant Role to User on Project
 
983
        role_grant = {'user_id': user['id'],
 
984
                      'project_id': project['id'],
 
985
                      'data': json.dumps({'roles': [role_list[0]['id']]})}
 
986
        session.execute(
 
987
            user_project_metadata_table.insert().values(role_grant))
 
988
 
 
989
        role_grant = {'user_id': user['id'],
 
990
                      'project_id': project2['id'],
 
991
                      'data': json.dumps({'roles': [role_list[1]['id']]})}
 
992
        session.execute(
 
993
            user_project_metadata_table.insert().values(role_grant))
 
994
 
 
995
        # Grant Role to Group on different Project
 
996
        role_grant = {'group_id': group['id'],
 
997
                      'project_id': project2['id'],
 
998
                      'data': json.dumps({'roles': [role_list[2]['id']]})}
 
999
        session.execute(
 
1000
            group_project_metadata_table.insert().values(role_grant))
 
1001
 
 
1002
        # Grant Role to User on Domain
 
1003
        role_grant = {'user_id': user['id'],
 
1004
                      'domain_id': domain['id'],
 
1005
                      'data': json.dumps({'roles': [role_list[3]['id']]})}
 
1006
        session.execute(user_domain_metadata_table.insert().values(role_grant))
 
1007
 
 
1008
        # Grant Role to Group on Domain
 
1009
        role_grant = {'group_id': group['id'],
 
1010
                      'domain_id': domain['id'],
 
1011
                      'data': json.dumps(
 
1012
                          {'roles': [role_list[4]['id']],
 
1013
                           'other': 'somedata'})}
 
1014
        session.execute(
 
1015
            group_domain_metadata_table.insert().values(role_grant))
 
1016
 
 
1017
        session.commit()
 
1018
 
 
1019
        self.upgrade(29)
 
1020
        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
 
1021
            (user_project_metadata_table.c.user_id == user['id']) &
 
1022
            (user_project_metadata_table.c.project_id == project['id']))
 
1023
        r = session.execute(s)
 
1024
        data = json.loads(r.fetchone()['data'])
 
1025
        self.assertEqual(len(data['roles']), 1)
 
1026
        self.assertIn({'id': role_list[0]['id']}, data['roles'])
 
1027
 
 
1028
        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
 
1029
            (user_project_metadata_table.c.user_id == user['id']) &
 
1030
            (user_project_metadata_table.c.project_id == project2['id']))
 
1031
        r = session.execute(s)
 
1032
        data = json.loads(r.fetchone()['data'])
 
1033
        self.assertEqual(len(data['roles']), 1)
 
1034
        self.assertIn({'id': role_list[1]['id']}, data['roles'])
 
1035
 
 
1036
        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
 
1037
            (group_project_metadata_table.c.group_id == group['id']) &
 
1038
            (group_project_metadata_table.c.project_id == project2['id']))
 
1039
        r = session.execute(s)
 
1040
        data = json.loads(r.fetchone()['data'])
 
1041
        self.assertEqual(len(data['roles']), 1)
 
1042
        self.assertIn({'id': role_list[2]['id']}, data['roles'])
 
1043
 
 
1044
        s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
 
1045
            (user_domain_metadata_table.c.user_id == user['id']) &
 
1046
            (user_domain_metadata_table.c.domain_id == domain['id']))
 
1047
        r = session.execute(s)
 
1048
        data = json.loads(r.fetchone()['data'])
 
1049
        self.assertEqual(len(data['roles']), 1)
 
1050
        self.assertIn({'id': role_list[3]['id']}, data['roles'])
 
1051
 
 
1052
        s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
 
1053
            (group_domain_metadata_table.c.group_id == group['id']) &
 
1054
            (group_domain_metadata_table.c.domain_id == domain['id']))
 
1055
        r = session.execute(s)
 
1056
        data = json.loads(r.fetchone()['data'])
 
1057
        self.assertEqual(len(data['roles']), 1)
 
1058
        self.assertIn({'id': role_list[4]['id']}, data['roles'])
 
1059
        self.assertIn('other', data)
 
1060
 
 
1061
        # Now add an entry that has one regular and one inherited role
 
1062
        role_grant = {'user_id': user['id'],
 
1063
                      'domain_id': domain2['id'],
 
1064
                      'data': json.dumps(
 
1065
                          {'roles': [{'id': role_list[5]['id']},
 
1066
                                     {'id': role_list[6]['id'],
 
1067
                                      'inherited_to': 'projects'}]})}
 
1068
        session.execute(user_domain_metadata_table.insert().values(role_grant))
 
1069
 
 
1070
        session.commit()
 
1071
        self.downgrade(28)
 
1072
 
 
1073
        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
 
1074
            (user_project_metadata_table.c.user_id == user['id']) &
 
1075
            (user_project_metadata_table.c.project_id == project['id']))
 
1076
        r = session.execute(s)
 
1077
        data = json.loads(r.fetchone()['data'])
 
1078
        self.assertEqual(len(data['roles']), 1)
 
1079
        self.assertIn(role_list[0]['id'], data['roles'])
 
1080
 
 
1081
        s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
 
1082
            (user_project_metadata_table.c.user_id == user['id']) &
 
1083
            (user_project_metadata_table.c.project_id == project2['id']))
 
1084
        r = session.execute(s)
 
1085
        data = json.loads(r.fetchone()['data'])
 
1086
        self.assertEqual(len(data['roles']), 1)
 
1087
        self.assertIn(role_list[1]['id'], data['roles'])
 
1088
 
 
1089
        s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
 
1090
            (group_project_metadata_table.c.group_id == group['id']) &
 
1091
            (group_project_metadata_table.c.project_id == project2['id']))
 
1092
        r = session.execute(s)
 
1093
        data = json.loads(r.fetchone()['data'])
 
1094
        self.assertEqual(len(data['roles']), 1)
 
1095
        self.assertIn(role_list[2]['id'], data['roles'])
 
1096
 
 
1097
        s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
 
1098
            (user_domain_metadata_table.c.user_id == user['id']) &
 
1099
            (user_domain_metadata_table.c.domain_id == domain['id']))
 
1100
        r = session.execute(s)
 
1101
        data = json.loads(r.fetchone()['data'])
 
1102
        self.assertEqual(len(data['roles']), 1)
 
1103
        self.assertIn(role_list[3]['id'], data['roles'])
 
1104
 
 
1105
        s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
 
1106
            (group_domain_metadata_table.c.group_id == group['id']) &
 
1107
            (group_domain_metadata_table.c.domain_id == domain['id']))
 
1108
        r = session.execute(s)
 
1109
        data = json.loads(r.fetchone()['data'])
 
1110
        self.assertEqual(len(data['roles']), 1)
 
1111
        self.assertIn(role_list[4]['id'], data['roles'])
 
1112
        self.assertIn('other', data)
 
1113
 
 
1114
        # For user-domain2, where we had one regular and one inherited role,
 
1115
        # only the direct role should remain, the inherited role should
 
1116
        # have been deleted during the downgrade
 
1117
        s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
 
1118
            (user_domain_metadata_table.c.user_id == user['id']) &
 
1119
            (user_domain_metadata_table.c.domain_id == domain2['id']))
 
1120
        r = session.execute(s)
 
1121
        data = json.loads(r.fetchone()['data'])
 
1122
        self.assertEqual(len(data['roles']), 1)
 
1123
        self.assertIn(role_list[5]['id'], data['roles'])
 
1124
 
810
1125
    def populate_user_table(self, with_pass_enab=False,
811
1126
                            with_pass_enab_domain=False):
812
1127
        # Populate the appropriate fields in the user
948
1263
        for ver, change in changeset:
949
1264
            self.schema.runchange(ver, change, changeset.step)
950
1265
        self.assertEqual(self.schema.version, version)
 
1266
 
 
1267
    def _mysql_check_all_tables_innodb(self):
 
1268
        database = self.engine.url.database
 
1269
 
 
1270
        connection = self.engine.connect()
 
1271
        # sanity check
 
1272
        total = connection.execute("SELECT count(*) "
 
1273
                                   "from information_schema.TABLES "
 
1274
                                   "where TABLE_SCHEMA='%(database)s'" %
 
1275
                                   locals())
 
1276
        self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
 
1277
 
 
1278
        noninnodb = connection.execute("SELECT table_name "
 
1279
                                       "from information_schema.TABLES "
 
1280
                                       "where TABLE_SCHEMA='%(database)s' "
 
1281
                                       "and ENGINE!='InnoDB' "
 
1282
                                       "and TABLE_NAME!='migrate_version'" %
 
1283
                                       locals())
 
1284
        names = [x[0] for x in noninnodb]
 
1285
        self.assertEqual(names, [],
 
1286
                         "Non-InnoDB tables exist")
 
1287
 
 
1288
        connection.close()