1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright (c) 2013 Boris Pavlovic (boris@pavlovic.me).
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
18
from migrate.changeset import UniqueConstraint
19
from sqlalchemy import Integer, BigInteger, DateTime, String
20
from sqlalchemy import MetaData, Table, Column
21
from sqlalchemy.sql import select
23
from nova.db.sqlalchemy import utils
24
from nova import exception
25
from nova.tests import test_migrations
28
class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
29
"""Class for testing utils that are used in db migrations."""
31
def test_utils_drop_unique_constraint(self):
32
table_name = "__test_tmp_table__"
35
{'id': 1, 'a': 3, 'foo': 10},
36
{'id': 2, 'a': 2, 'foo': 20},
37
{'id': 3, 'a': 1, 'foo': 30}
39
for key, engine in self.engines.items():
42
test_table = Table(table_name, meta,
43
Column('id', Integer, primary_key=True,
46
Column('foo', Integer),
47
UniqueConstraint('a', name='uniq_a'),
48
UniqueConstraint('foo', name=uc_name))
51
engine.execute(test_table.insert(), values)
52
# NOTE(boris-42): This method is generic UC dropper.
53
utils.drop_unique_constraint(engine, table_name, uc_name, 'foo')
55
s = test_table.select().order_by(test_table.c.id)
56
rows = engine.execute(s).fetchall()
58
for i in xrange(0, len(values)):
60
self.assertEqual((v['id'], v['a'], v['foo']), rows[i])
62
# NOTE(boris-42): Update data about Table from DB.
65
test_table = Table(table_name, meta, autoload=True)
66
constraints = filter(lambda c: c.name == uc_name,
67
test_table.constraints)
68
self.assertEqual(len(constraints), 0)
69
self.assertEqual(len(test_table.constraints), 1)
73
def test_util_drop_unique_constraint_with_not_supported_sqlite_type(self):
74
table_name = "__test_tmp_table__"
77
{'id': 1, 'a': 3, 'foo': 10},
78
{'id': 2, 'a': 2, 'foo': 20},
79
{'id': 3, 'a': 1, 'foo': 30}
82
for key, engine in self.engines.items():
85
test_table = Table(table_name, meta,
86
Column('id', Integer, primary_key=True,
89
Column('foo', BigInteger, default=0),
90
UniqueConstraint('a', name='uniq_a'),
91
UniqueConstraint('foo', name=uc_name))
94
engine.execute(test_table.insert(), values)
96
# NOTE(boris-42): Missing info about column `foo` that has
97
# unsupported type BigInteger.
98
self.assertRaises(exception.NovaException,
99
utils.drop_unique_constraint,
100
engine, table_name, uc_name, 'foo')
102
# NOTE(boris-42): Wrong type of foo instance. it should be
103
# instance of sqlalchemy.Column.
104
self.assertRaises(exception.NovaException,
105
utils.drop_unique_constraint,
106
engine, table_name, uc_name, 'foo',
109
foo = Column('foo', BigInteger, default=0)
110
utils.drop_unique_constraint(engine, table_name, uc_name, 'foo',
113
s = test_table.select().order_by(test_table.c.id)
114
rows = engine.execute(s).fetchall()
116
for i in xrange(0, len(values)):
118
self.assertEqual((v['id'], v['a'], v['foo']), rows[i])
120
# NOTE(boris-42): Update data about Table from DB.
123
test_table = Table(table_name, meta, autoload=True)
124
constraints = filter(lambda c: c.name == uc_name,
125
test_table.constraints)
126
self.assertEqual(len(constraints), 0)
127
self.assertEqual(len(test_table.constraints), 1)
130
def _populate_db_for_drop_duplicate_entries(self, engine, meta,
133
{'id': 11, 'a': 3, 'b': 10, 'c': 'abcdef'},
134
{'id': 12, 'a': 5, 'b': 10, 'c': 'abcdef'},
135
{'id': 13, 'a': 6, 'b': 10, 'c': 'abcdef'},
136
{'id': 14, 'a': 7, 'b': 10, 'c': 'abcdef'},
137
{'id': 21, 'a': 1, 'b': 20, 'c': 'aa'},
138
{'id': 31, 'a': 1, 'b': 20, 'c': 'bb'},
139
{'id': 41, 'a': 1, 'b': 30, 'c': 'aef'},
140
{'id': 42, 'a': 2, 'b': 30, 'c': 'aef'},
141
{'id': 43, 'a': 3, 'b': 30, 'c': 'aef'}
144
test_table = Table(table_name, meta,
145
Column('id', Integer, primary_key=True,
147
Column('a', Integer),
148
Column('b', Integer),
150
Column('deleted', Integer, default=0),
151
Column('deleted_at', DateTime),
152
Column('updated_at', DateTime))
155
engine.execute(test_table.insert(), values)
156
return test_table, values
158
def test_drop_old_duplicate_entries_from_table(self):
159
table_name = "__test_tmp_table__"
161
for key, engine in self.engines.items():
164
test_table, values = self.\
165
_populate_db_for_drop_duplicate_entries(engine, meta,
168
utils.drop_old_duplicate_entries_from_table(engine, table_name,
173
for value in sorted(values, key=lambda x: x['id'], reverse=True):
174
uniq_value = (('b', value['b']), ('c', value['c']))
175
if uniq_value in uniq_values:
177
uniq_values.add(uniq_value)
178
expected_ids.append(value['id'])
180
real_ids = [row[0] for row in
181
engine.execute(select([test_table.c.id])).fetchall()]
183
self.assertEqual(len(real_ids), len(expected_ids))
184
for id_ in expected_ids:
185
self.assertTrue(id_ in real_ids)
187
def test_drop_old_duplicate_entries_from_table_soft_delete(self):
188
table_name = "__test_tmp_table__"
190
for key, engine in self.engines.items():
193
table, values = self.\
194
_populate_db_for_drop_duplicate_entries(engine, meta,
196
utils.drop_old_duplicate_entries_from_table(engine, table_name,
200
soft_deleted_values = []
202
for value in sorted(values, key=lambda x: x['id'], reverse=True):
203
uniq_value = (('b', value['b']), ('c', value['c']))
204
if uniq_value in uniq_values:
205
soft_deleted_values.append(value)
207
uniq_values.add(uniq_value)
208
expected_values.append(value)
210
base_select = table.select()
212
rows_select = base_select.\
213
where(table.c.deleted != table.c.id)
214
row_ids = [row['id'] for row in
215
engine.execute(rows_select).fetchall()]
216
self.assertEqual(len(row_ids), len(expected_values))
217
for value in expected_values:
218
self.assertTrue(value['id'] in row_ids)
220
deleted_rows_select = base_select.\
221
where(table.c.deleted == table.c.id)
222
deleted_rows_ids = [row['id'] for row in
223
engine.execute(deleted_rows_select).fetchall()]
224
self.assertEqual(len(deleted_rows_ids),
225
len(values) - len(row_ids))
226
for value in soft_deleted_values:
227
self.assertTrue(value['id'] in deleted_rows_ids)