~canonical-livepatch-dependencies/canonical-livepatch-service-dependencies/alembic

« back to all changes in this revision

Viewing changes to tests/test_batch.py

  • Committer: Free Ekanayaka
  • Author(s): Ondřej Nový
  • Date: 2016-04-21 20:04:57 UTC
  • Revision ID: free.ekanayaka@canonical.com-20160421200457-b4x0zmvn1nhc094p
Tags: upstream-0.8.6
Import upstream version 0.8.6

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from contextlib import contextmanager
 
2
import re
 
3
 
 
4
from alembic.testing import exclusions
 
5
from alembic.testing import assert_raises_message
 
6
from alembic.testing import TestBase, eq_, config
 
7
from alembic.testing.fixtures import op_fixture
 
8
from alembic.testing import mock
 
9
from alembic.operations import Operations
 
10
from alembic.operations.batch import ApplyBatchImpl
 
11
from alembic.runtime.migration import MigrationContext
 
12
 
 
13
 
 
14
from sqlalchemy import Integer, Table, Column, String, MetaData, ForeignKey, \
 
15
    UniqueConstraint, ForeignKeyConstraint, Index, Boolean, CheckConstraint, \
 
16
    Enum
 
17
from sqlalchemy.engine.reflection import Inspector
 
18
from sqlalchemy.sql import column, text
 
19
from sqlalchemy.schema import CreateTable, CreateIndex
 
20
from sqlalchemy import exc
 
21
 
 
22
 
 
23
class BatchApplyTest(TestBase):
 
24
    __requires__ = ('sqlalchemy_08', )
 
25
 
 
26
    def setUp(self):
 
27
        self.op = Operations(mock.Mock(opts={}))
 
28
 
 
29
    def _simple_fixture(self, table_args=(), table_kwargs={}):
 
30
        m = MetaData()
 
31
        t = Table(
 
32
            'tname', m,
 
33
            Column('id', Integer, primary_key=True),
 
34
            Column('x', String(10)),
 
35
            Column('y', Integer)
 
36
        )
 
37
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
38
 
 
39
    def _uq_fixture(self, table_args=(), table_kwargs={}):
 
40
        m = MetaData()
 
41
        t = Table(
 
42
            'tname', m,
 
43
            Column('id', Integer, primary_key=True),
 
44
            Column('x', String()),
 
45
            Column('y', Integer),
 
46
            UniqueConstraint('y', name='uq1')
 
47
        )
 
48
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
49
 
 
50
    def _ix_fixture(self, table_args=(), table_kwargs={}):
 
51
        m = MetaData()
 
52
        t = Table(
 
53
            'tname', m,
 
54
            Column('id', Integer, primary_key=True),
 
55
            Column('x', String()),
 
56
            Column('y', Integer),
 
57
            Index('ix1', 'y')
 
58
        )
 
59
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
60
 
 
61
    def _literal_ck_fixture(
 
62
            self, copy_from=None, table_args=(), table_kwargs={}):
 
63
        m = MetaData()
 
64
        if copy_from is not None:
 
65
            t = copy_from
 
66
        else:
 
67
            t = Table(
 
68
                'tname', m,
 
69
                Column('id', Integer, primary_key=True),
 
70
                Column('email', String()),
 
71
                CheckConstraint("email LIKE '%@%'")
 
72
            )
 
73
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
74
 
 
75
    def _sql_ck_fixture(self, table_args=(), table_kwargs={}):
 
76
        m = MetaData()
 
77
        t = Table(
 
78
            'tname', m,
 
79
            Column('id', Integer, primary_key=True),
 
80
            Column('email', String())
 
81
        )
 
82
        t.append_constraint(CheckConstraint(t.c.email.like('%@%')))
 
83
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
84
 
 
85
    def _fk_fixture(self, table_args=(), table_kwargs={}):
 
86
        m = MetaData()
 
87
        t = Table(
 
88
            'tname', m,
 
89
            Column('id', Integer, primary_key=True),
 
90
            Column('email', String()),
 
91
            Column('user_id', Integer, ForeignKey('user.id'))
 
92
        )
 
93
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
94
 
 
95
    def _multi_fk_fixture(self, table_args=(), table_kwargs={}, schema=None):
 
96
        m = MetaData()
 
97
        if schema:
 
98
            schemaarg = "%s." % schema
 
99
        else:
 
100
            schemaarg = ""
 
101
 
 
102
        t = Table(
 
103
            'tname', m,
 
104
            Column('id', Integer, primary_key=True),
 
105
            Column('email', String()),
 
106
            Column('user_id_1', Integer, ForeignKey('%suser.id' % schemaarg)),
 
107
            Column('user_id_2', Integer, ForeignKey('%suser.id' % schemaarg)),
 
108
            Column('user_id_3', Integer),
 
109
            Column('user_id_version', Integer),
 
110
            ForeignKeyConstraint(
 
111
                ['user_id_3', 'user_id_version'],
 
112
                ['%suser.id' % schemaarg, '%suser.id_version' % schemaarg]),
 
113
            schema=schema
 
114
        )
 
115
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
116
 
 
117
    def _named_fk_fixture(self, table_args=(), table_kwargs={}):
 
118
        m = MetaData()
 
119
        t = Table(
 
120
            'tname', m,
 
121
            Column('id', Integer, primary_key=True),
 
122
            Column('email', String()),
 
123
            Column('user_id', Integer, ForeignKey('user.id', name='ufk'))
 
124
        )
 
125
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
126
 
 
127
    def _selfref_fk_fixture(self, table_args=(), table_kwargs={}):
 
128
        m = MetaData()
 
129
        t = Table(
 
130
            'tname', m,
 
131
            Column('id', Integer, primary_key=True),
 
132
            Column('parent_id', Integer, ForeignKey('tname.id')),
 
133
            Column('data', String)
 
134
        )
 
135
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
136
 
 
137
    def _boolean_fixture(self, table_args=(), table_kwargs={}):
 
138
        m = MetaData()
 
139
        t = Table(
 
140
            'tname', m,
 
141
            Column('id', Integer, primary_key=True),
 
142
            Column('flag', Boolean)
 
143
        )
 
144
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
145
 
 
146
    def _boolean_no_ck_fixture(self, table_args=(), table_kwargs={}):
 
147
        m = MetaData()
 
148
        t = Table(
 
149
            'tname', m,
 
150
            Column('id', Integer, primary_key=True),
 
151
            Column('flag', Boolean(create_constraint=False))
 
152
        )
 
153
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
154
 
 
155
    def _enum_fixture(self, table_args=(), table_kwargs={}):
 
156
        m = MetaData()
 
157
        t = Table(
 
158
            'tname', m,
 
159
            Column('id', Integer, primary_key=True),
 
160
            Column('thing', Enum('a', 'b', 'c'))
 
161
        )
 
162
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
163
 
 
164
    def _server_default_fixture(self, table_args=(), table_kwargs={}):
 
165
        m = MetaData()
 
166
        t = Table(
 
167
            'tname', m,
 
168
            Column('id', Integer, primary_key=True),
 
169
            Column('thing', String(), server_default='')
 
170
        )
 
171
        return ApplyBatchImpl(t, table_args, table_kwargs)
 
172
 
 
173
    def _assert_impl(self, impl, colnames=None,
 
174
                     ddl_contains=None, ddl_not_contains=None,
 
175
                     dialect='default', schema=None):
 
176
        context = op_fixture(dialect=dialect)
 
177
 
 
178
        impl._create(context.impl)
 
179
 
 
180
        if colnames is None:
 
181
            colnames = ['id', 'x', 'y']
 
182
        eq_(impl.new_table.c.keys(), colnames)
 
183
 
 
184
        pk_cols = [col for col in impl.new_table.c if col.primary_key]
 
185
        eq_(list(impl.new_table.primary_key), pk_cols)
 
186
 
 
187
        create_stmt = str(
 
188
            CreateTable(impl.new_table).compile(dialect=context.dialect))
 
189
        create_stmt = re.sub(r'[\n\t]', '', create_stmt)
 
190
 
 
191
        idx_stmt = ""
 
192
        for idx in impl.indexes.values():
 
193
            idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect))
 
194
        for idx in impl.new_indexes.values():
 
195
            impl.new_table.name = impl.table.name
 
196
            idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect))
 
197
            impl.new_table.name = '_alembic_batch_temp'
 
198
        idx_stmt = re.sub(r'[\n\t]', '', idx_stmt)
 
199
 
 
200
        if ddl_contains:
 
201
            assert ddl_contains in create_stmt + idx_stmt
 
202
        if ddl_not_contains:
 
203
            assert ddl_not_contains not in create_stmt + idx_stmt
 
204
 
 
205
        expected = [
 
206
            create_stmt,
 
207
        ]
 
208
 
 
209
        if schema:
 
210
            args = {"schema": "%s." % schema}
 
211
        else:
 
212
            args = {"schema": ""}
 
213
 
 
214
        args['colnames'] = ", ".join([
 
215
            impl.new_table.c[name].name
 
216
            for name in colnames
 
217
            if name in impl.table.c])
 
218
        args['tname_colnames'] = ", ".join(
 
219
            "CAST(%(schema)stname.%(name)s AS %(type)s) AS anon_1" % {
 
220
                'schema': args['schema'],
 
221
                'name': name,
 
222
                'type': impl.new_table.c[name].type
 
223
            }
 
224
            if (
 
225
                impl.new_table.c[name].type._type_affinity
 
226
                is not impl.table.c[name].type._type_affinity)
 
227
            else "%(schema)stname.%(name)s" % {
 
228
                'schema': args['schema'], 'name': name}
 
229
            for name in colnames if name in impl.table.c
 
230
        )
 
231
 
 
232
        expected.extend([
 
233
            'INSERT INTO %(schema)s_alembic_batch_temp (%(colnames)s) '
 
234
            'SELECT %(tname_colnames)s FROM %(schema)stname' % args,
 
235
            'DROP TABLE %(schema)stname' % args,
 
236
            'ALTER TABLE %(schema)s_alembic_batch_temp '
 
237
            'RENAME TO %(schema)stname' % args
 
238
        ])
 
239
        if idx_stmt:
 
240
            expected.append(idx_stmt)
 
241
        context.assert_(*expected)
 
242
        return impl.new_table
 
243
 
 
244
    def test_change_type(self):
 
245
        impl = self._simple_fixture()
 
246
        impl.alter_column('tname', 'x', type_=Integer)
 
247
        new_table = self._assert_impl(impl)
 
248
        assert new_table.c.x.type._type_affinity is Integer
 
249
 
 
250
    def test_rename_col(self):
 
251
        impl = self._simple_fixture()
 
252
        impl.alter_column('tname', 'x', name='q')
 
253
        new_table = self._assert_impl(impl)
 
254
        eq_(new_table.c.x.name, 'q')
 
255
 
 
256
    def test_rename_col_boolean(self):
 
257
        impl = self._boolean_fixture()
 
258
        impl.alter_column('tname', 'flag', name='bflag')
 
259
        new_table = self._assert_impl(
 
260
            impl, ddl_contains="CHECK (bflag IN (0, 1)",
 
261
            colnames=["id", "flag"])
 
262
        eq_(new_table.c.flag.name, 'bflag')
 
263
        eq_(
 
264
            len([
 
265
                const for const
 
266
                in new_table.constraints
 
267
                if isinstance(const, CheckConstraint)]),
 
268
            1)
 
269
 
 
270
    def test_change_type_schematype_to_non(self):
 
271
        impl = self._boolean_fixture()
 
272
        impl.alter_column('tname', 'flag', type_=Integer)
 
273
        new_table = self._assert_impl(
 
274
            impl, colnames=['id', 'flag'],
 
275
            ddl_not_contains="CHECK")
 
276
        assert new_table.c.flag.type._type_affinity is Integer
 
277
 
 
278
        # NOTE: we can't do test_change_type_non_to_schematype
 
279
        # at this level because the "add_constraint" part of this
 
280
        # comes from toimpl.py, which we aren't testing here
 
281
 
 
282
    def test_rename_col_boolean_no_ck(self):
 
283
        impl = self._boolean_no_ck_fixture()
 
284
        impl.alter_column('tname', 'flag', name='bflag')
 
285
        new_table = self._assert_impl(
 
286
            impl, ddl_not_contains="CHECK",
 
287
            colnames=["id", "flag"])
 
288
        eq_(new_table.c.flag.name, 'bflag')
 
289
        eq_(
 
290
            len([
 
291
                const for const
 
292
                in new_table.constraints
 
293
                if isinstance(const, CheckConstraint)]),
 
294
            0)
 
295
 
 
296
    def test_rename_col_enum(self):
 
297
        impl = self._enum_fixture()
 
298
        impl.alter_column('tname', 'thing', name='thang')
 
299
        new_table = self._assert_impl(
 
300
            impl, ddl_contains="CHECK (thang IN ('a', 'b', 'c')",
 
301
            colnames=["id", "thing"])
 
302
        eq_(new_table.c.thing.name, 'thang')
 
303
        eq_(
 
304
            len([
 
305
                const for const
 
306
                in new_table.constraints
 
307
                if isinstance(const, CheckConstraint)]),
 
308
            1)
 
309
 
 
310
    def test_rename_col_literal_ck(self):
 
311
        impl = self._literal_ck_fixture()
 
312
        impl.alter_column('tname', 'email', name='emol')
 
313
        new_table = self._assert_impl(
 
314
            # note this is wrong, we don't dig into the SQL
 
315
            impl, ddl_contains="CHECK (email LIKE '%@%')",
 
316
            colnames=["id", "email"])
 
317
        eq_(
 
318
            len([c for c in new_table.constraints
 
319
                if isinstance(c, CheckConstraint)]), 1)
 
320
 
 
321
        eq_(new_table.c.email.name, 'emol')
 
322
 
 
323
    def test_rename_col_literal_ck_workaround(self):
 
324
        impl = self._literal_ck_fixture(
 
325
            copy_from=Table(
 
326
                'tname', MetaData(),
 
327
                Column('id', Integer, primary_key=True),
 
328
                Column('email', String),
 
329
            ),
 
330
            table_args=[CheckConstraint("emol LIKE '%@%'")])
 
331
 
 
332
        impl.alter_column('tname', 'email', name='emol')
 
333
        new_table = self._assert_impl(
 
334
            impl, ddl_contains="CHECK (emol LIKE '%@%')",
 
335
            colnames=["id", "email"])
 
336
        eq_(
 
337
            len([c for c in new_table.constraints
 
338
                if isinstance(c, CheckConstraint)]), 1)
 
339
        eq_(new_table.c.email.name, 'emol')
 
340
 
 
341
    def test_rename_col_sql_ck(self):
 
342
        impl = self._sql_ck_fixture()
 
343
 
 
344
        impl.alter_column('tname', 'email', name='emol')
 
345
        new_table = self._assert_impl(
 
346
            impl, ddl_contains="CHECK (emol LIKE '%@%')",
 
347
            colnames=["id", "email"])
 
348
        eq_(
 
349
            len([c for c in new_table.constraints
 
350
                if isinstance(c, CheckConstraint)]), 1)
 
351
 
 
352
        eq_(new_table.c.email.name, 'emol')
 
353
 
 
354
    def test_add_col(self):
 
355
        impl = self._simple_fixture()
 
356
        col = Column('g', Integer)
 
357
        # operations.add_column produces a table
 
358
        t = self.op.schema_obj.table('tname', col)  # noqa
 
359
        impl.add_column('tname', col)
 
360
        new_table = self._assert_impl(impl, colnames=['id', 'x', 'y', 'g'])
 
361
        eq_(new_table.c.g.name, 'g')
 
362
 
 
363
    def test_add_server_default(self):
 
364
        impl = self._simple_fixture()
 
365
        impl.alter_column('tname', 'y', server_default="10")
 
366
        new_table = self._assert_impl(
 
367
            impl, ddl_contains="DEFAULT '10'")
 
368
        eq_(
 
369
            new_table.c.y.server_default.arg, "10"
 
370
        )
 
371
 
 
372
    def test_drop_server_default(self):
 
373
        impl = self._server_default_fixture()
 
374
        impl.alter_column('tname', 'thing', server_default=None)
 
375
        new_table = self._assert_impl(
 
376
            impl, colnames=['id', 'thing'], ddl_not_contains="DEFAULT")
 
377
        eq_(new_table.c.thing.server_default, None)
 
378
 
 
379
    def test_rename_col_pk(self):
 
380
        impl = self._simple_fixture()
 
381
        impl.alter_column('tname', 'id', name='foobar')
 
382
        new_table = self._assert_impl(
 
383
            impl, ddl_contains="PRIMARY KEY (foobar)")
 
384
        eq_(new_table.c.id.name, 'foobar')
 
385
        eq_(list(new_table.primary_key), [new_table.c.id])
 
386
 
 
387
    def test_rename_col_fk(self):
 
388
        impl = self._fk_fixture()
 
389
        impl.alter_column('tname', 'user_id', name='foobar')
 
390
        new_table = self._assert_impl(
 
391
            impl, colnames=['id', 'email', 'user_id'],
 
392
            ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)')
 
393
        eq_(new_table.c.user_id.name, 'foobar')
 
394
        eq_(
 
395
            list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
 
396
            "user.id"
 
397
        )
 
398
 
 
399
    def test_regen_multi_fk(self):
 
400
        impl = self._multi_fk_fixture()
 
401
        self._assert_impl(
 
402
            impl, colnames=[
 
403
                'id', 'email', 'user_id_1', 'user_id_2',
 
404
                'user_id_3', 'user_id_version'],
 
405
            ddl_contains='FOREIGN KEY(user_id_3, user_id_version) '
 
406
            'REFERENCES "user" (id, id_version)')
 
407
 
 
408
    # _get_colspec() in 0.8 calls upon fk.column when schema is
 
409
    # present.  not sure if we want to try to fix this
 
410
    @config.requirements.fail_before_sqla_09
 
411
    def test_regen_multi_fk_schema(self):
 
412
        impl = self._multi_fk_fixture(schema='foo_schema')
 
413
        self._assert_impl(
 
414
            impl, colnames=[
 
415
                'id', 'email', 'user_id_1', 'user_id_2',
 
416
                'user_id_3', 'user_id_version'],
 
417
            ddl_contains='FOREIGN KEY(user_id_3, user_id_version) '
 
418
            'REFERENCES foo_schema."user" (id, id_version)',
 
419
            schema='foo_schema')
 
420
 
 
421
    def test_drop_col(self):
 
422
        impl = self._simple_fixture()
 
423
        impl.drop_column('tname', column('x'))
 
424
        new_table = self._assert_impl(impl, colnames=['id', 'y'])
 
425
        assert 'y' in new_table.c
 
426
        assert 'x' not in new_table.c
 
427
 
 
428
    def test_drop_col_remove_pk(self):
 
429
        impl = self._simple_fixture()
 
430
        impl.drop_column('tname', column('id'))
 
431
        new_table = self._assert_impl(
 
432
            impl, colnames=['x', 'y'], ddl_not_contains="PRIMARY KEY")
 
433
        assert 'y' in new_table.c
 
434
        assert 'id' not in new_table.c
 
435
        assert not new_table.primary_key
 
436
 
 
437
    def test_drop_col_remove_fk(self):
 
438
        impl = self._fk_fixture()
 
439
        impl.drop_column('tname', column('user_id'))
 
440
        new_table = self._assert_impl(
 
441
            impl, colnames=['id', 'email'], ddl_not_contains="FOREIGN KEY")
 
442
        assert 'user_id' not in new_table.c
 
443
        assert not new_table.foreign_keys
 
444
 
 
445
    def test_drop_col_retain_fk(self):
 
446
        impl = self._fk_fixture()
 
447
        impl.drop_column('tname', column('email'))
 
448
        new_table = self._assert_impl(
 
449
            impl, colnames=['id', 'user_id'],
 
450
            ddl_contains='FOREIGN KEY(user_id) REFERENCES "user" (id)')
 
451
        assert 'email' not in new_table.c
 
452
        assert new_table.c.user_id.foreign_keys
 
453
 
 
454
    def test_drop_col_retain_fk_selfref(self):
 
455
        impl = self._selfref_fk_fixture()
 
456
        impl.drop_column('tname', column('data'))
 
457
        new_table = self._assert_impl(impl, colnames=['id', 'parent_id'])
 
458
        assert 'data' not in new_table.c
 
459
        assert new_table.c.parent_id.foreign_keys
 
460
 
 
461
    def test_add_fk(self):
 
462
        impl = self._simple_fixture()
 
463
        impl.add_column('tname', Column('user_id', Integer))
 
464
        fk = self.op.schema_obj.foreign_key_constraint(
 
465
            'fk1', 'tname', 'user',
 
466
            ['user_id'], ['id'])
 
467
        impl.add_constraint(fk)
 
468
        new_table = self._assert_impl(
 
469
            impl, colnames=['id', 'x', 'y', 'user_id'],
 
470
            ddl_contains='CONSTRAINT fk1 FOREIGN KEY(user_id) '
 
471
            'REFERENCES "user" (id)')
 
472
        eq_(
 
473
            list(new_table.c.user_id.foreign_keys)[0]._get_colspec(),
 
474
            'user.id'
 
475
        )
 
476
 
 
477
    def test_drop_fk(self):
 
478
        impl = self._named_fk_fixture()
 
479
        fk = ForeignKeyConstraint([], [], name='ufk')
 
480
        impl.drop_constraint(fk)
 
481
        new_table = self._assert_impl(
 
482
            impl, colnames=['id', 'email', 'user_id'],
 
483
            ddl_not_contains="CONSTRANT fk1")
 
484
        eq_(
 
485
            list(new_table.foreign_keys),
 
486
            []
 
487
        )
 
488
 
 
489
    def test_add_uq(self):
 
490
        impl = self._simple_fixture()
 
491
        uq = self.op.schema_obj.unique_constraint(
 
492
            'uq1', 'tname', ['y']
 
493
        )
 
494
 
 
495
        impl.add_constraint(uq)
 
496
        self._assert_impl(
 
497
            impl, colnames=['id', 'x', 'y'],
 
498
            ddl_contains="CONSTRAINT uq1 UNIQUE")
 
499
 
 
500
    def test_drop_uq(self):
 
501
        impl = self._uq_fixture()
 
502
 
 
503
        uq = self.op.schema_obj.unique_constraint(
 
504
            'uq1', 'tname', ['y']
 
505
        )
 
506
        impl.drop_constraint(uq)
 
507
        self._assert_impl(
 
508
            impl, colnames=['id', 'x', 'y'],
 
509
            ddl_not_contains="CONSTRAINT uq1 UNIQUE")
 
510
 
 
511
    def test_create_index(self):
 
512
        impl = self._simple_fixture()
 
513
        ix = self.op.schema_obj.index('ix1', 'tname', ['y'])
 
514
 
 
515
        impl.create_index(ix)
 
516
        self._assert_impl(
 
517
            impl, colnames=['id', 'x', 'y'],
 
518
            ddl_contains="CREATE INDEX ix1")
 
519
 
 
520
    def test_drop_index(self):
 
521
        impl = self._ix_fixture()
 
522
 
 
523
        ix = self.op.schema_obj.index('ix1', 'tname', ['y'])
 
524
        impl.drop_index(ix)
 
525
        self._assert_impl(
 
526
            impl, colnames=['id', 'x', 'y'],
 
527
            ddl_not_contains="CONSTRAINT uq1 UNIQUE")
 
528
 
 
529
    def test_add_table_opts(self):
 
530
        impl = self._simple_fixture(table_kwargs={'mysql_engine': 'InnoDB'})
 
531
        self._assert_impl(
 
532
            impl, ddl_contains="ENGINE=InnoDB",
 
533
            dialect='mysql'
 
534
        )
 
535
 
 
536
 
 
537
class BatchAPITest(TestBase):
 
538
    __requires__ = ('sqlalchemy_08', )
 
539
 
 
540
    @contextmanager
 
541
    def _fixture(self, schema=None):
 
542
        migration_context = mock.Mock(
 
543
            opts={}, impl=mock.MagicMock(__dialect__='sqlite'))
 
544
        op = Operations(migration_context)
 
545
        batch = op.batch_alter_table(
 
546
            'tname', recreate='never', schema=schema).__enter__()
 
547
 
 
548
        mock_schema = mock.MagicMock()
 
549
        with mock.patch("alembic.operations.schemaobj.sa_schema", mock_schema):
 
550
            yield batch
 
551
        batch.impl.flush()
 
552
        self.mock_schema = mock_schema
 
553
 
 
554
    def test_drop_col(self):
 
555
        with self._fixture() as batch:
 
556
            batch.drop_column('q')
 
557
 
 
558
        eq_(
 
559
            batch.impl.operations.impl.mock_calls,
 
560
            [mock.call.drop_column(
 
561
                'tname', self.mock_schema.Column(), schema=None)]
 
562
        )
 
563
 
 
564
    def test_add_col(self):
 
565
        column = Column('w', String(50))
 
566
 
 
567
        with self._fixture() as batch:
 
568
            batch.add_column(column)
 
569
 
 
570
        eq_(
 
571
            batch.impl.operations.impl.mock_calls,
 
572
            [mock.call.add_column(
 
573
                'tname', column, schema=None)]
 
574
        )
 
575
 
 
576
    def test_create_fk(self):
 
577
        with self._fixture() as batch:
 
578
            batch.create_foreign_key('myfk', 'user', ['x'], ['y'])
 
579
 
 
580
        eq_(
 
581
            self.mock_schema.ForeignKeyConstraint.mock_calls,
 
582
            [
 
583
                mock.call(
 
584
                    ['x'], ['user.y'],
 
585
                    onupdate=None, ondelete=None, name='myfk',
 
586
                    initially=None, deferrable=None, match=None)
 
587
            ]
 
588
        )
 
589
        eq_(
 
590
            self.mock_schema.Table.mock_calls,
 
591
            [
 
592
                mock.call(
 
593
                    'user', self.mock_schema.MetaData(),
 
594
                    self.mock_schema.Column(),
 
595
                    schema=None
 
596
                ),
 
597
                mock.call(
 
598
                    'tname', self.mock_schema.MetaData(),
 
599
                    self.mock_schema.Column(),
 
600
                    schema=None
 
601
                ),
 
602
                mock.call().append_constraint(
 
603
                    self.mock_schema.ForeignKeyConstraint())
 
604
            ]
 
605
        )
 
606
        eq_(
 
607
            batch.impl.operations.impl.mock_calls,
 
608
            [mock.call.add_constraint(
 
609
                self.mock_schema.ForeignKeyConstraint())]
 
610
        )
 
611
 
 
612
    def test_create_fk_schema(self):
 
613
        with self._fixture(schema='foo') as batch:
 
614
            batch.create_foreign_key('myfk', 'user', ['x'], ['y'])
 
615
 
 
616
        eq_(
 
617
            self.mock_schema.ForeignKeyConstraint.mock_calls,
 
618
            [
 
619
                mock.call(
 
620
                    ['x'], ['user.y'],
 
621
                    onupdate=None, ondelete=None, name='myfk',
 
622
                    initially=None, deferrable=None, match=None)
 
623
            ]
 
624
        )
 
625
        eq_(
 
626
            self.mock_schema.Table.mock_calls,
 
627
            [
 
628
                mock.call(
 
629
                    'user', self.mock_schema.MetaData(),
 
630
                    self.mock_schema.Column(),
 
631
                    schema=None
 
632
                ),
 
633
                mock.call(
 
634
                    'tname', self.mock_schema.MetaData(),
 
635
                    self.mock_schema.Column(),
 
636
                    schema='foo'
 
637
                ),
 
638
                mock.call().append_constraint(
 
639
                    self.mock_schema.ForeignKeyConstraint())
 
640
            ]
 
641
        )
 
642
        eq_(
 
643
            batch.impl.operations.impl.mock_calls,
 
644
            [mock.call.add_constraint(
 
645
                self.mock_schema.ForeignKeyConstraint())]
 
646
        )
 
647
 
 
648
    def test_create_uq(self):
 
649
        with self._fixture() as batch:
 
650
            batch.create_unique_constraint('uq1', ['a', 'b'])
 
651
 
 
652
        eq_(
 
653
            self.mock_schema.Table().c.__getitem__.mock_calls,
 
654
            [mock.call('a'), mock.call('b')]
 
655
        )
 
656
 
 
657
        eq_(
 
658
            self.mock_schema.UniqueConstraint.mock_calls,
 
659
            [
 
660
                mock.call(
 
661
                    self.mock_schema.Table().c.__getitem__(),
 
662
                    self.mock_schema.Table().c.__getitem__(),
 
663
                    name='uq1'
 
664
                )
 
665
            ]
 
666
        )
 
667
        eq_(
 
668
            batch.impl.operations.impl.mock_calls,
 
669
            [mock.call.add_constraint(
 
670
                self.mock_schema.UniqueConstraint())]
 
671
        )
 
672
 
 
673
    def test_create_pk(self):
 
674
        with self._fixture() as batch:
 
675
            batch.create_primary_key('pk1', ['a', 'b'])
 
676
 
 
677
        eq_(
 
678
            self.mock_schema.Table().c.__getitem__.mock_calls,
 
679
            [mock.call('a'), mock.call('b')]
 
680
        )
 
681
 
 
682
        eq_(
 
683
            self.mock_schema.PrimaryKeyConstraint.mock_calls,
 
684
            [
 
685
                mock.call(
 
686
                    self.mock_schema.Table().c.__getitem__(),
 
687
                    self.mock_schema.Table().c.__getitem__(),
 
688
                    name='pk1'
 
689
                )
 
690
            ]
 
691
        )
 
692
        eq_(
 
693
            batch.impl.operations.impl.mock_calls,
 
694
            [mock.call.add_constraint(
 
695
                self.mock_schema.PrimaryKeyConstraint())]
 
696
        )
 
697
 
 
698
    def test_create_check(self):
 
699
        expr = text("a > b")
 
700
        with self._fixture() as batch:
 
701
            batch.create_check_constraint('ck1', expr)
 
702
 
 
703
        eq_(
 
704
            self.mock_schema.CheckConstraint.mock_calls,
 
705
            [
 
706
                mock.call(
 
707
                    expr, name="ck1"
 
708
                )
 
709
            ]
 
710
        )
 
711
        eq_(
 
712
            batch.impl.operations.impl.mock_calls,
 
713
            [mock.call.add_constraint(
 
714
                self.mock_schema.CheckConstraint())]
 
715
        )
 
716
 
 
717
    def test_drop_constraint(self):
 
718
        with self._fixture() as batch:
 
719
            batch.drop_constraint('uq1')
 
720
 
 
721
        eq_(
 
722
            self.mock_schema.Constraint.mock_calls,
 
723
            [
 
724
                mock.call(name='uq1')
 
725
            ]
 
726
        )
 
727
        eq_(
 
728
            batch.impl.operations.impl.mock_calls,
 
729
            [mock.call.drop_constraint(self.mock_schema.Constraint())]
 
730
        )
 
731
 
 
732
 
 
733
class CopyFromTest(TestBase):
 
734
    __requires__ = ('sqlalchemy_08', )
 
735
 
 
736
    def _fixture(self):
 
737
        self.metadata = MetaData()
 
738
        self.table = Table(
 
739
            'foo', self.metadata,
 
740
            Column('id', Integer, primary_key=True),
 
741
            Column('data', String(50)),
 
742
            Column('x', Integer),
 
743
        )
 
744
 
 
745
        context = op_fixture(dialect="sqlite", as_sql=True)
 
746
        self.op = Operations(context)
 
747
        return context
 
748
 
 
749
    def test_change_type(self):
 
750
        context = self._fixture()
 
751
        with self.op.batch_alter_table(
 
752
                "foo", copy_from=self.table) as batch_op:
 
753
            batch_op.alter_column('data', type_=Integer)
 
754
 
 
755
        context.assert_(
 
756
            'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
 
757
            'data INTEGER, x INTEGER, PRIMARY KEY (id))',
 
758
            'INSERT INTO _alembic_batch_temp (id, data, x) SELECT foo.id, '
 
759
            'CAST(foo.data AS INTEGER) AS anon_1, foo.x FROM foo',
 
760
            'DROP TABLE foo',
 
761
            'ALTER TABLE _alembic_batch_temp RENAME TO foo'
 
762
        )
 
763
 
 
764
    def test_change_type_from_schematype(self):
 
765
        context = self._fixture()
 
766
        self.table.append_column(
 
767
            Column('y', Boolean(
 
768
                create_constraint=True, name="ck1")))
 
769
 
 
770
        with self.op.batch_alter_table(
 
771
                "foo", copy_from=self.table) as batch_op:
 
772
            batch_op.alter_column(
 
773
                'y', type_=Integer,
 
774
                existing_type=Boolean(
 
775
                    create_constraint=True, name="ck1"))
 
776
        context.assert_(
 
777
            'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
 
778
            'data VARCHAR(50), x INTEGER, y INTEGER, PRIMARY KEY (id))',
 
779
            'INSERT INTO _alembic_batch_temp (id, data, x, y) SELECT foo.id, '
 
780
            'foo.data, foo.x, CAST(foo.y AS INTEGER) AS anon_1 FROM foo',
 
781
            'DROP TABLE foo',
 
782
            'ALTER TABLE _alembic_batch_temp RENAME TO foo'
 
783
        )
 
784
 
 
785
    def test_change_type_to_schematype(self):
 
786
        context = self._fixture()
 
787
        self.table.append_column(
 
788
            Column('y', Integer))
 
789
 
 
790
        with self.op.batch_alter_table(
 
791
                "foo", copy_from=self.table) as batch_op:
 
792
            batch_op.alter_column(
 
793
                'y', existing_type=Integer,
 
794
                type_=Boolean(
 
795
                    create_constraint=True, name="ck1"))
 
796
        context.assert_(
 
797
            'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
 
798
            'data VARCHAR(50), x INTEGER, y BOOLEAN, PRIMARY KEY (id), '
 
799
            'CONSTRAINT ck1 CHECK (y IN (0, 1)))',
 
800
            'INSERT INTO _alembic_batch_temp (id, data, x, y) SELECT foo.id, '
 
801
            'foo.data, foo.x, CAST(foo.y AS BOOLEAN) AS anon_1 FROM foo',
 
802
            'DROP TABLE foo',
 
803
            'ALTER TABLE _alembic_batch_temp RENAME TO foo'
 
804
        )
 
805
 
 
806
    def test_create_drop_index_w_always(self):
 
807
        context = self._fixture()
 
808
        with self.op.batch_alter_table(
 
809
                "foo", copy_from=self.table, recreate='always') as batch_op:
 
810
            batch_op.create_index(
 
811
                'ix_data', ['data'], unique=True)
 
812
 
 
813
        context.assert_(
 
814
            'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
 
815
            'data VARCHAR(50), '
 
816
            'x INTEGER, PRIMARY KEY (id))',
 
817
            'INSERT INTO _alembic_batch_temp (id, data, x) '
 
818
            'SELECT foo.id, foo.data, foo.x FROM foo',
 
819
            'DROP TABLE foo',
 
820
            'ALTER TABLE _alembic_batch_temp RENAME TO foo',
 
821
            'CREATE UNIQUE INDEX ix_data ON foo (data)',
 
822
        )
 
823
 
 
824
        context.clear_assertions()
 
825
 
 
826
        Index('ix_data', self.table.c.data, unique=True)
 
827
        with self.op.batch_alter_table(
 
828
                "foo", copy_from=self.table, recreate='always') as batch_op:
 
829
            batch_op.drop_index('ix_data')
 
830
 
 
831
        context.assert_(
 
832
            'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
 
833
            'data VARCHAR(50), x INTEGER, PRIMARY KEY (id))',
 
834
            'INSERT INTO _alembic_batch_temp (id, data, x) '
 
835
            'SELECT foo.id, foo.data, foo.x FROM foo',
 
836
            'DROP TABLE foo',
 
837
            'ALTER TABLE _alembic_batch_temp RENAME TO foo'
 
838
        )
 
839
 
 
840
    def test_create_drop_index_wo_always(self):
 
841
        context = self._fixture()
 
842
        with self.op.batch_alter_table(
 
843
                "foo", copy_from=self.table) as batch_op:
 
844
            batch_op.create_index(
 
845
                'ix_data', ['data'], unique=True)
 
846
 
 
847
        context.assert_(
 
848
            'CREATE UNIQUE INDEX ix_data ON foo (data)'
 
849
        )
 
850
 
 
851
        context.clear_assertions()
 
852
 
 
853
        Index('ix_data', self.table.c.data, unique=True)
 
854
        with self.op.batch_alter_table(
 
855
                "foo", copy_from=self.table) as batch_op:
 
856
            batch_op.drop_index('ix_data')
 
857
 
 
858
        context.assert_(
 
859
            'DROP INDEX ix_data'
 
860
        )
 
861
 
 
862
    def test_create_drop_index_w_other_ops(self):
 
863
        context = self._fixture()
 
864
        with self.op.batch_alter_table(
 
865
                "foo", copy_from=self.table) as batch_op:
 
866
            batch_op.alter_column('data', type_=Integer)
 
867
            batch_op.create_index(
 
868
                'ix_data', ['data'], unique=True)
 
869
 
 
870
        context.assert_(
 
871
            'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
 
872
            'data INTEGER, x INTEGER, PRIMARY KEY (id))',
 
873
            'INSERT INTO _alembic_batch_temp (id, data, x) SELECT foo.id, '
 
874
            'CAST(foo.data AS INTEGER) AS anon_1, foo.x FROM foo',
 
875
            'DROP TABLE foo',
 
876
            'ALTER TABLE _alembic_batch_temp RENAME TO foo',
 
877
            'CREATE UNIQUE INDEX ix_data ON foo (data)',
 
878
        )
 
879
 
 
880
        context.clear_assertions()
 
881
 
 
882
        Index('ix_data', self.table.c.data, unique=True)
 
883
        with self.op.batch_alter_table(
 
884
                "foo", copy_from=self.table) as batch_op:
 
885
            batch_op.drop_index('ix_data')
 
886
            batch_op.alter_column('data', type_=String)
 
887
 
 
888
        context.assert_(
 
889
            'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
 
890
            'data VARCHAR, x INTEGER, PRIMARY KEY (id))',
 
891
            'INSERT INTO _alembic_batch_temp (id, data, x) SELECT foo.id, '
 
892
            'CAST(foo.data AS VARCHAR) AS anon_1, foo.x FROM foo',
 
893
            'DROP TABLE foo',
 
894
            'ALTER TABLE _alembic_batch_temp RENAME TO foo'
 
895
        )
 
896
 
 
897
 
 
898
class BatchRoundTripTest(TestBase):
 
899
    __requires__ = ('sqlalchemy_08', )
 
900
    __only_on__ = "sqlite"
 
901
 
 
902
    def setUp(self):
 
903
        self.conn = config.db.connect()
 
904
        self.metadata = MetaData()
 
905
        t1 = Table(
 
906
            'foo', self.metadata,
 
907
            Column('id', Integer, primary_key=True),
 
908
            Column('data', String(50)),
 
909
            Column('x', Integer),
 
910
            mysql_engine='InnoDB'
 
911
        )
 
912
        t1.create(self.conn)
 
913
 
 
914
        self.conn.execute(
 
915
            t1.insert(),
 
916
            [
 
917
                {"id": 1, "data": "d1", "x": 5},
 
918
                {"id": 2, "data": "22", "x": 6},
 
919
                {"id": 3, "data": "8.5", "x": 7},
 
920
                {"id": 4, "data": "9.46", "x": 8},
 
921
                {"id": 5, "data": "d5", "x": 9}
 
922
            ]
 
923
        )
 
924
        context = MigrationContext.configure(self.conn)
 
925
        self.op = Operations(context)
 
926
 
 
927
    @contextmanager
 
928
    def _sqlite_referential_integrity(self):
 
929
        self.conn.execute("PRAGMA foreign_keys=ON")
 
930
        try:
 
931
            yield
 
932
        finally:
 
933
            self.conn.execute("PRAGMA foreign_keys=OFF")
 
934
 
 
935
    def _no_pk_fixture(self):
 
936
        nopk = Table(
 
937
            'nopk', self.metadata,
 
938
            Column('a', Integer),
 
939
            Column('b', Integer),
 
940
            Column('c', Integer),
 
941
            mysql_engine='InnoDB'
 
942
        )
 
943
        nopk.create(self.conn)
 
944
        self.conn.execute(
 
945
            nopk.insert(),
 
946
            [
 
947
                {"a": 1, "b": 2, "c": 3},
 
948
                {"a": 2, "b": 4, "c": 5},
 
949
            ]
 
950
 
 
951
        )
 
952
        return nopk
 
953
 
 
954
    def _table_w_index_fixture(self):
 
955
        t = Table(
 
956
            't_w_ix', self.metadata,
 
957
            Column('id', Integer, primary_key=True),
 
958
            Column('thing', Integer),
 
959
            Column('data', String(20)),
 
960
        )
 
961
        Index('ix_thing', t.c.thing)
 
962
        t.create(self.conn)
 
963
        return t
 
964
 
 
965
    def _boolean_fixture(self):
 
966
        t = Table(
 
967
            'hasbool', self.metadata,
 
968
            Column('x', Boolean(create_constraint=True, name='ck1')),
 
969
            Column('y', Integer)
 
970
        )
 
971
        t.create(self.conn)
 
972
 
 
973
    def _int_to_boolean_fixture(self):
 
974
        t = Table(
 
975
            'hasbool', self.metadata,
 
976
            Column('x', Integer)
 
977
        )
 
978
        t.create(self.conn)
 
979
 
 
980
    def test_change_type_boolean_to_int(self):
 
981
        self._boolean_fixture()
 
982
        with self.op.batch_alter_table(
 
983
                "hasbool"
 
984
        ) as batch_op:
 
985
            batch_op.alter_column(
 
986
                'x', type_=Integer, existing_type=Boolean(
 
987
                    create_constraint=True, name='ck1'))
 
988
        insp = Inspector.from_engine(config.db)
 
989
 
 
990
        eq_(
 
991
            [c['type']._type_affinity for c in insp.get_columns('hasbool')
 
992
             if c['name'] == 'x'],
 
993
            [Integer]
 
994
        )
 
995
 
 
996
    def test_drop_col_schematype(self):
 
997
        self._boolean_fixture()
 
998
        with self.op.batch_alter_table(
 
999
                "hasbool"
 
1000
        ) as batch_op:
 
1001
            batch_op.drop_column('x')
 
1002
        insp = Inspector.from_engine(config.db)
 
1003
 
 
1004
        assert 'x' not in (c['name'] for c in insp.get_columns('hasbool'))
 
1005
 
 
1006
    def test_change_type_int_to_boolean(self):
 
1007
        self._int_to_boolean_fixture()
 
1008
        with self.op.batch_alter_table(
 
1009
                "hasbool"
 
1010
        ) as batch_op:
 
1011
            batch_op.alter_column(
 
1012
                'x', type_=Boolean(create_constraint=True, name='ck1'))
 
1013
        insp = Inspector.from_engine(config.db)
 
1014
 
 
1015
        if exclusions.against(config, "sqlite"):
 
1016
            eq_(
 
1017
                [c['type']._type_affinity for
 
1018
                 c in insp.get_columns('hasbool') if c['name'] == 'x'],
 
1019
                [Boolean]
 
1020
            )
 
1021
        elif exclusions.against(config, "mysql"):
 
1022
            eq_(
 
1023
                [c['type']._type_affinity for
 
1024
                 c in insp.get_columns('hasbool') if c['name'] == 'x'],
 
1025
                [Integer]
 
1026
            )
 
1027
 
 
1028
    def tearDown(self):
 
1029
        self.metadata.drop_all(self.conn)
 
1030
        self.conn.close()
 
1031
 
 
1032
    def _assert_data(self, data, tablename='foo'):
 
1033
        eq_(
 
1034
            [dict(row) for row
 
1035
             in self.conn.execute("select * from %s" % tablename)],
 
1036
            data
 
1037
        )
 
1038
 
 
1039
    def test_ix_existing(self):
 
1040
        self._table_w_index_fixture()
 
1041
 
 
1042
        with self.op.batch_alter_table("t_w_ix") as batch_op:
 
1043
            batch_op.alter_column('data', type_=String(30))
 
1044
            batch_op.create_index("ix_data", ["data"])
 
1045
 
 
1046
        insp = Inspector.from_engine(config.db)
 
1047
        eq_(
 
1048
            set(
 
1049
                (ix['name'], tuple(ix['column_names'])) for ix in
 
1050
                insp.get_indexes('t_w_ix')
 
1051
            ),
 
1052
            set([
 
1053
                ('ix_data', ('data',)),
 
1054
                ('ix_thing', ('thing', ))
 
1055
            ])
 
1056
        )
 
1057
 
 
1058
    def test_fk_points_to_me_auto(self):
 
1059
        self._test_fk_points_to_me("auto")
 
1060
 
 
1061
    # in particular, this tests that the failures
 
1062
    # on PG and MySQL result in recovery of the batch system,
 
1063
    # e.g. that the _alembic_batch_temp table is dropped
 
1064
    @config.requirements.no_referential_integrity
 
1065
    def test_fk_points_to_me_recreate(self):
 
1066
        self._test_fk_points_to_me("always")
 
1067
 
 
1068
    @exclusions.only_on("sqlite")
 
1069
    @exclusions.fails(
 
1070
        "intentionally asserting that this "
 
1071
        "doesn't work w/ pragma foreign keys")
 
1072
    def test_fk_points_to_me_sqlite_refinteg(self):
 
1073
        with self._sqlite_referential_integrity():
 
1074
            self._test_fk_points_to_me("auto")
 
1075
 
 
1076
    def _test_fk_points_to_me(self, recreate):
 
1077
        bar = Table(
 
1078
            'bar', self.metadata,
 
1079
            Column('id', Integer, primary_key=True),
 
1080
            Column('foo_id', Integer, ForeignKey('foo.id')),
 
1081
            mysql_engine='InnoDB'
 
1082
        )
 
1083
        bar.create(self.conn)
 
1084
        self.conn.execute(bar.insert(), {'id': 1, 'foo_id': 3})
 
1085
 
 
1086
        with self.op.batch_alter_table("foo", recreate=recreate) as batch_op:
 
1087
            batch_op.alter_column(
 
1088
                'data', new_column_name='newdata', existing_type=String(50))
 
1089
 
 
1090
        insp = Inspector.from_engine(self.conn)
 
1091
        eq_(
 
1092
            [(key['referred_table'],
 
1093
             key['referred_columns'], key['constrained_columns'])
 
1094
             for key in insp.get_foreign_keys('bar')],
 
1095
            [('foo', ['id'], ['foo_id'])]
 
1096
        )
 
1097
 
 
1098
    def test_selfref_fk_auto(self):
 
1099
        self._test_selfref_fk("auto")
 
1100
 
 
1101
    @config.requirements.no_referential_integrity
 
1102
    def test_selfref_fk_recreate(self):
 
1103
        self._test_selfref_fk("always")
 
1104
 
 
1105
    @exclusions.only_on("sqlite")
 
1106
    @exclusions.fails(
 
1107
        "intentionally asserting that this "
 
1108
        "doesn't work w/ pragma foreign keys")
 
1109
    def test_selfref_fk_sqlite_refinteg(self):
 
1110
        with self._sqlite_referential_integrity():
 
1111
            self._test_selfref_fk("auto")
 
1112
 
 
1113
    def _test_selfref_fk(self, recreate):
 
1114
        bar = Table(
 
1115
            'bar', self.metadata,
 
1116
            Column('id', Integer, primary_key=True),
 
1117
            Column('bar_id', Integer, ForeignKey('bar.id')),
 
1118
            Column('data', String(50)),
 
1119
            mysql_engine='InnoDB'
 
1120
        )
 
1121
        bar.create(self.conn)
 
1122
        self.conn.execute(bar.insert(), {'id': 1, 'data': 'x', 'bar_id': None})
 
1123
        self.conn.execute(bar.insert(), {'id': 2, 'data': 'y', 'bar_id': 1})
 
1124
 
 
1125
        with self.op.batch_alter_table("bar", recreate=recreate) as batch_op:
 
1126
            batch_op.alter_column(
 
1127
                'data', new_column_name='newdata', existing_type=String(50))
 
1128
 
 
1129
        insp = Inspector.from_engine(self.conn)
 
1130
 
 
1131
        insp = Inspector.from_engine(self.conn)
 
1132
        eq_(
 
1133
            [(key['referred_table'],
 
1134
             key['referred_columns'], key['constrained_columns'])
 
1135
             for key in insp.get_foreign_keys('bar')],
 
1136
            [('bar', ['id'], ['bar_id'])]
 
1137
        )
 
1138
 
 
1139
    def test_change_type(self):
 
1140
        with self.op.batch_alter_table("foo") as batch_op:
 
1141
            batch_op.alter_column('data', type_=Integer)
 
1142
 
 
1143
        self._assert_data([
 
1144
            {"id": 1, "data": 0, "x": 5},
 
1145
            {"id": 2, "data": 22, "x": 6},
 
1146
            {"id": 3, "data": 8, "x": 7},
 
1147
            {"id": 4, "data": 9, "x": 8},
 
1148
            {"id": 5, "data": 0, "x": 9}
 
1149
        ])
 
1150
 
 
1151
    def test_drop_column(self):
 
1152
        with self.op.batch_alter_table("foo") as batch_op:
 
1153
            batch_op.drop_column('data')
 
1154
 
 
1155
        self._assert_data([
 
1156
            {"id": 1, "x": 5},
 
1157
            {"id": 2, "x": 6},
 
1158
            {"id": 3, "x": 7},
 
1159
            {"id": 4, "x": 8},
 
1160
            {"id": 5, "x": 9}
 
1161
        ])
 
1162
 
 
1163
    def test_add_pk_constraint(self):
 
1164
        self._no_pk_fixture()
 
1165
        with self.op.batch_alter_table("nopk", recreate="always") as batch_op:
 
1166
            batch_op.create_primary_key('newpk', ['a', 'b'])
 
1167
 
 
1168
        pk_const = Inspector.from_engine(self.conn).get_pk_constraint('nopk')
 
1169
        with config.requirements.reflects_pk_names.fail_if():
 
1170
            eq_(pk_const['name'], 'newpk')
 
1171
        eq_(pk_const['constrained_columns'], ['a', 'b'])
 
1172
 
 
1173
    @config.requirements.check_constraints_w_enforcement
 
1174
    def test_add_ck_constraint(self):
 
1175
        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
 
1176
            batch_op.create_check_constraint("newck", text("x > 0"))
 
1177
 
 
1178
        # we dont support reflection of CHECK constraints
 
1179
        # so test this by just running invalid data in
 
1180
        foo = self.metadata.tables['foo']
 
1181
 
 
1182
        assert_raises_message(
 
1183
            exc.IntegrityError,
 
1184
            "newck",
 
1185
            self.conn.execute,
 
1186
            foo.insert(), {"id": 6, "data": 5, "x": -2}
 
1187
        )
 
1188
 
 
1189
    @config.requirements.sqlalchemy_094
 
1190
    @config.requirements.unnamed_constraints
 
1191
    def test_drop_foreign_key(self):
 
1192
        bar = Table(
 
1193
            'bar', self.metadata,
 
1194
            Column('id', Integer, primary_key=True),
 
1195
            Column('foo_id', Integer, ForeignKey('foo.id')),
 
1196
            mysql_engine='InnoDB'
 
1197
        )
 
1198
        bar.create(self.conn)
 
1199
        self.conn.execute(bar.insert(), {'id': 1, 'foo_id': 3})
 
1200
 
 
1201
        naming_convention = {
 
1202
            "fk":
 
1203
            "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
 
1204
        }
 
1205
        with self.op.batch_alter_table(
 
1206
                "bar", naming_convention=naming_convention) as batch_op:
 
1207
            batch_op.drop_constraint(
 
1208
                "fk_bar_foo_id_foo", type_="foreignkey")
 
1209
        eq_(
 
1210
            Inspector.from_engine(self.conn).get_foreign_keys('bar'),
 
1211
            []
 
1212
        )
 
1213
 
 
1214
    def test_drop_column_fk_recreate(self):
 
1215
        with self.op.batch_alter_table("foo", recreate='always') as batch_op:
 
1216
            batch_op.drop_column('data')
 
1217
 
 
1218
        self._assert_data([
 
1219
            {"id": 1, "x": 5},
 
1220
            {"id": 2, "x": 6},
 
1221
            {"id": 3, "x": 7},
 
1222
            {"id": 4, "x": 8},
 
1223
            {"id": 5, "x": 9}
 
1224
        ])
 
1225
 
 
1226
    def test_rename_column(self):
 
1227
        with self.op.batch_alter_table("foo") as batch_op:
 
1228
            batch_op.alter_column('x', new_column_name='y')
 
1229
 
 
1230
        self._assert_data([
 
1231
            {"id": 1, "data": "d1", "y": 5},
 
1232
            {"id": 2, "data": "22", "y": 6},
 
1233
            {"id": 3, "data": "8.5", "y": 7},
 
1234
            {"id": 4, "data": "9.46", "y": 8},
 
1235
            {"id": 5, "data": "d5", "y": 9}
 
1236
        ])
 
1237
 
 
1238
    def test_rename_column_boolean(self):
 
1239
        bar = Table(
 
1240
            'bar', self.metadata,
 
1241
            Column('id', Integer, primary_key=True),
 
1242
            Column('flag', Boolean()),
 
1243
            mysql_engine='InnoDB'
 
1244
        )
 
1245
        bar.create(self.conn)
 
1246
        self.conn.execute(bar.insert(), {'id': 1, 'flag': True})
 
1247
        self.conn.execute(bar.insert(), {'id': 2, 'flag': False})
 
1248
 
 
1249
        with self.op.batch_alter_table(
 
1250
            "bar"
 
1251
        ) as batch_op:
 
1252
            batch_op.alter_column(
 
1253
                'flag', new_column_name='bflag', existing_type=Boolean)
 
1254
 
 
1255
        self._assert_data([
 
1256
            {"id": 1, 'bflag': True},
 
1257
            {"id": 2, 'bflag': False},
 
1258
        ], 'bar')
 
1259
 
 
1260
    @config.requirements.non_native_boolean
 
1261
    def test_rename_column_non_native_boolean_no_ck(self):
 
1262
        bar = Table(
 
1263
            'bar', self.metadata,
 
1264
            Column('id', Integer, primary_key=True),
 
1265
            Column('flag', Boolean(create_constraint=False)),
 
1266
            mysql_engine='InnoDB'
 
1267
        )
 
1268
        bar.create(self.conn)
 
1269
        self.conn.execute(bar.insert(), {'id': 1, 'flag': True})
 
1270
        self.conn.execute(bar.insert(), {'id': 2, 'flag': False})
 
1271
        self.conn.execute(bar.insert(), {'id': 3, 'flag': 5})
 
1272
 
 
1273
        with self.op.batch_alter_table(
 
1274
            "bar",
 
1275
            reflect_args=[Column('flag', Boolean(create_constraint=False))]
 
1276
        ) as batch_op:
 
1277
            batch_op.alter_column(
 
1278
                'flag', new_column_name='bflag', existing_type=Boolean)
 
1279
 
 
1280
        self._assert_data([
 
1281
            {"id": 1, 'bflag': True},
 
1282
            {"id": 2, 'bflag': False},
 
1283
            {'id': 3, 'bflag': 5}
 
1284
        ], 'bar')
 
1285
 
 
1286
    def test_drop_column_pk(self):
 
1287
        with self.op.batch_alter_table("foo") as batch_op:
 
1288
            batch_op.drop_column('id')
 
1289
 
 
1290
        self._assert_data([
 
1291
            {"data": "d1", "x": 5},
 
1292
            {"data": "22", "x": 6},
 
1293
            {"data": "8.5", "x": 7},
 
1294
            {"data": "9.46", "x": 8},
 
1295
            {"data": "d5", "x": 9}
 
1296
        ])
 
1297
 
 
1298
    def test_rename_column_pk(self):
 
1299
        with self.op.batch_alter_table("foo") as batch_op:
 
1300
            batch_op.alter_column('id', new_column_name='ident')
 
1301
 
 
1302
        self._assert_data([
 
1303
            {"ident": 1, "data": "d1", "x": 5},
 
1304
            {"ident": 2, "data": "22", "x": 6},
 
1305
            {"ident": 3, "data": "8.5", "x": 7},
 
1306
            {"ident": 4, "data": "9.46", "x": 8},
 
1307
            {"ident": 5, "data": "d5", "x": 9}
 
1308
        ])
 
1309
 
 
1310
    def test_add_column_auto(self):
 
1311
        # note this uses ALTER
 
1312
        with self.op.batch_alter_table("foo") as batch_op:
 
1313
            batch_op.add_column(
 
1314
                Column('data2', String(50), server_default='hi'))
 
1315
 
 
1316
        self._assert_data([
 
1317
            {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'},
 
1318
            {"id": 2, "data": "22", "x": 6, 'data2': 'hi'},
 
1319
            {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'},
 
1320
            {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'},
 
1321
            {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'}
 
1322
        ])
 
1323
 
 
1324
    def test_add_column_recreate(self):
 
1325
        with self.op.batch_alter_table("foo", recreate='always') as batch_op:
 
1326
            batch_op.add_column(
 
1327
                Column('data2', String(50), server_default='hi'))
 
1328
 
 
1329
        self._assert_data([
 
1330
            {"id": 1, "data": "d1", "x": 5, 'data2': 'hi'},
 
1331
            {"id": 2, "data": "22", "x": 6, 'data2': 'hi'},
 
1332
            {"id": 3, "data": "8.5", "x": 7, 'data2': 'hi'},
 
1333
            {"id": 4, "data": "9.46", "x": 8, 'data2': 'hi'},
 
1334
            {"id": 5, "data": "d5", "x": 9, 'data2': 'hi'}
 
1335
        ])
 
1336
 
 
1337
    def test_create_drop_index(self):
 
1338
        insp = Inspector.from_engine(config.db)
 
1339
        eq_(
 
1340
            insp.get_indexes('foo'), []
 
1341
        )
 
1342
 
 
1343
        with self.op.batch_alter_table("foo", recreate='always') as batch_op:
 
1344
            batch_op.create_index(
 
1345
                'ix_data', ['data'], unique=True)
 
1346
 
 
1347
        self._assert_data([
 
1348
            {"id": 1, "data": "d1", "x": 5},
 
1349
            {"id": 2, "data": "22", "x": 6},
 
1350
            {"id": 3, "data": "8.5", "x": 7},
 
1351
            {"id": 4, "data": "9.46", "x": 8},
 
1352
            {"id": 5, "data": "d5", "x": 9}
 
1353
        ])
 
1354
 
 
1355
        insp = Inspector.from_engine(config.db)
 
1356
        eq_(
 
1357
            [
 
1358
                dict(unique=ix['unique'],
 
1359
                     name=ix['name'],
 
1360
                     column_names=ix['column_names'])
 
1361
                for ix in insp.get_indexes('foo')
 
1362
            ],
 
1363
            [{'unique': True, 'name': 'ix_data', 'column_names': ['data']}]
 
1364
        )
 
1365
 
 
1366
        with self.op.batch_alter_table("foo", recreate='always') as batch_op:
 
1367
            batch_op.drop_index('ix_data')
 
1368
 
 
1369
        insp = Inspector.from_engine(config.db)
 
1370
        eq_(
 
1371
            insp.get_indexes('foo'), []
 
1372
        )
 
1373
 
 
1374
 
 
1375
class BatchRoundTripMySQLTest(BatchRoundTripTest):
 
1376
    __only_on__ = "mysql"
 
1377
 
 
1378
    @exclusions.fails()
 
1379
    def test_rename_column_pk(self):
 
1380
        super(BatchRoundTripMySQLTest, self).test_rename_column_pk()
 
1381
 
 
1382
    @exclusions.fails()
 
1383
    def test_rename_column(self):
 
1384
        super(BatchRoundTripMySQLTest, self).test_rename_column()
 
1385
 
 
1386
    @exclusions.fails()
 
1387
    def test_change_type(self):
 
1388
        super(BatchRoundTripMySQLTest, self).test_change_type()
 
1389
 
 
1390
    def test_create_drop_index(self):
 
1391
        super(BatchRoundTripMySQLTest, self).test_create_drop_index()
 
1392
 
 
1393
 
 
1394
class BatchRoundTripPostgresqlTest(BatchRoundTripTest):
 
1395
    __only_on__ = "postgresql"
 
1396
 
 
1397
    @exclusions.fails()
 
1398
    def test_change_type(self):
 
1399
        super(BatchRoundTripPostgresqlTest, self).test_change_type()
 
1400
 
 
1401
    def test_create_drop_index(self):
 
1402
        super(BatchRoundTripPostgresqlTest, self).test_create_drop_index()
 
1403
 
 
1404
    @exclusions.fails()
 
1405
    def test_change_type_int_to_boolean(self):
 
1406
        super(BatchRoundTripPostgresqlTest, self).\
 
1407
            test_change_type_int_to_boolean()
 
1408
 
 
1409
    @exclusions.fails()
 
1410
    def test_change_type_boolean_to_int(self):
 
1411
        super(BatchRoundTripPostgresqlTest, self).\
 
1412
            test_change_type_boolean_to_int()