1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright (c) 2013 Boris Pavlovic (boris@pavlovic.me).
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
18
from migrate.changeset import UniqueConstraint
19
from sqlalchemy.engine import reflection
20
from sqlalchemy.ext.compiler import compiles
21
from sqlalchemy import func
22
from sqlalchemy import MetaData, Table, Column, Index
23
from sqlalchemy.sql.expression import UpdateBase, literal_column
24
from sqlalchemy.sql import select
25
from sqlalchemy.types import NullType
28
from nova import exception
29
from nova.openstack.common import log as logging
30
from nova.openstack.common import timeutils
33
LOG = logging.getLogger(__name__)
36
class InsertFromSelect(UpdateBase):
37
def __init__(self, table, select):
42
@compiles(InsertFromSelect)
43
def visit_insert_from_select(element, compiler, **kw):
44
return "INSERT INTO %s %s" % (
45
compiler.process(element.table, asfrom=True),
46
compiler.process(element.select))
49
def _drop_unique_constraint_in_sqlite(migrate_engine, table_name, uc_name,
50
**col_name_col_instance):
51
insp = reflection.Inspector.from_engine(migrate_engine)
52
meta = MetaData(bind=migrate_engine)
54
table = Table(table_name, meta, autoload=True)
56
for column in table.columns:
57
if isinstance(column.type, NullType):
59
new_column = col_name_col_instance.get(column.name)
60
except Exception as e:
61
msg = _("Please specify column %s in col_name_col_instance "
62
"param. It is required because column has unsupported "
64
raise exception.NovaException(msg % column.name)
66
if not isinstance(new_column, Column):
67
msg = _("col_name_col_instance param has wrong type of "
68
"column instance for column %s It should be instance "
69
"of sqlalchemy.Column.")
70
raise exception.NovaException(msg % column.name)
71
columns.append(new_column)
73
columns.append(column.copy())
75
constraints = [constraint for constraint in table.constraints
76
if not constraint.name == uc_name]
78
new_table = Table(table_name + "__tmp__", meta, *(columns + constraints))
82
for index in insp.get_indexes(table_name):
83
column_names = [new_table.c[c] for c in index['column_names']]
84
indexes.append(Index(index["name"],
86
unique=index["unique"]))
88
ins = InsertFromSelect(new_table, table.select())
89
migrate_engine.execute(ins)
92
[index.create(migrate_engine) for index in indexes]
93
new_table.rename(table_name)
96
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
97
**col_name_col_instance):
99
This method drops UC from table and works for mysql, postgresql and sqlite.
100
In mysql and postgresql we are able to use "alter table" constuction. In
101
sqlite is only one way to drop UC:
102
1) Create new table with same columns, indexes and constraints
103
(except one that we want to drop).
104
2) Copy data from old table to new.
106
4) Rename new table to the name of old table.
108
:param migrate_engine: sqlalchemy engine
109
:param table_name: name of table that contains uniq constarint.
110
:param uc_name: name of uniq constraint that will be dropped.
111
:param columns: columns that are in uniq constarint.
112
:param col_name_col_instance: contains pair column_name=column_instance.
113
column_instance is instance of Column. These params
114
are required only for columns that have unsupported
115
types by sqlite. For example BigInteger.
117
if migrate_engine.name in ["mysql", "postgresql"]:
119
meta.bind = migrate_engine
120
t = Table(table_name, meta, autoload=True)
121
uc = UniqueConstraint(*columns, table=t, name=uc_name)
124
_drop_unique_constraint_in_sqlite(migrate_engine, table_name, uc_name,
125
**col_name_col_instance)
128
def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
129
use_soft_delete, *uc_column_names):
131
This method is used to drop all old rows that have the same values for
132
columns in uc_columns.
135
meta.bind = migrate_engine
137
table = Table(table_name, meta, autoload=True)
138
columns_for_group_by = [table.c[name] for name in uc_column_names]
140
columns_for_select = [func.max(table.c.id)]
141
columns_for_select.extend(list(columns_for_group_by))
143
duplicated_rows_select = select(columns_for_select,
144
group_by=columns_for_group_by,
145
having=func.count(table.c.id) > 1)
147
for row in migrate_engine.execute(duplicated_rows_select):
148
# NOTE(boris-42): Do not remove row that has the biggest ID.
149
delete_condition = table.c.id != row[0]
150
for name in uc_column_names:
151
delete_condition &= table.c[name] == row[name]
153
rows_to_delete_select = select([table.c.id]).where(delete_condition)
154
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
155
LOG.info(_("Deleted duplicated row with id: %(id)s from table: "
156
"%(table)s") % dict(id=row[0], table=table_name))
159
delete_statement = table.update().\
160
where(delete_condition).\
162
'deleted': literal_column('id'),
163
'updated_at': literal_column('updated_at'),
164
'deleted_at': timeutils.utcnow()
167
delete_statement = table.delete().where(delete_condition)
168
migrate_engine.execute(delete_statement)