~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/sql/test_constraints.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from test.lib.testing import assert_raises, assert_raises_message
2
 
from sqlalchemy import *
 
1
from sqlalchemy.testing import assert_raises, assert_raises_message
 
2
from sqlalchemy import Table, Integer, String, Column, PrimaryKeyConstraint,\
 
3
    ForeignKeyConstraint, ForeignKey, UniqueConstraint, Index, MetaData, \
 
4
    CheckConstraint, func
3
5
from sqlalchemy import exc, schema
4
 
from test.lib import *
5
 
from test.lib import config, engines
6
 
from sqlalchemy.engine import ddl
7
 
from test.lib.testing import eq_
8
 
from test.lib.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL
9
 
from sqlalchemy.dialects.postgresql import base as postgresql
 
6
from sqlalchemy.testing import fixtures, AssertsExecutionResults, \
 
7
                    AssertsCompiledSQL
 
8
from sqlalchemy import testing
 
9
from sqlalchemy.engine import default
 
10
from sqlalchemy.testing import engines
 
11
from sqlalchemy.testing import eq_
 
12
from sqlalchemy.testing.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL
 
13
from sqlalchemy.sql import table, column
10
14
 
11
 
class ConstraintTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
 
15
class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
12
16
    __dialect__ = 'default'
13
17
 
14
 
    def setup(self):
15
 
        global metadata
16
 
        metadata = MetaData(testing.db)
17
 
 
18
 
    def teardown(self):
19
 
        metadata.drop_all()
20
 
 
21
 
    def test_constraint(self):
22
 
        employees = Table('employees', metadata,
 
18
    @testing.provide_metadata
 
19
    def test_pk_fk_constraint_create(self):
 
20
        metadata = self.metadata
 
21
 
 
22
        Table('employees', metadata,
23
23
            Column('id', Integer),
24
24
            Column('soc', String(40)),
25
25
            Column('name', String(30)),
26
26
            PrimaryKeyConstraint('id', 'soc')
27
27
            )
28
 
        elements = Table('elements', metadata,
 
28
        Table('elements', metadata,
29
29
            Column('id', Integer),
30
30
            Column('stuff', String(30)),
31
31
            Column('emp_id', Integer),
32
32
            Column('emp_soc', String(40)),
33
33
            PrimaryKeyConstraint('id', name='elements_primkey'),
34
 
            ForeignKeyConstraint(['emp_id', 'emp_soc'], ['employees.id', 'employees.soc'])
35
 
            )
36
 
        metadata.create_all()
37
 
 
38
 
    def test_double_fk_usage_raises(self):
39
 
        f = ForeignKey('b.id')
40
 
 
41
 
        Column('x', Integer, f)
42
 
        assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
43
 
 
44
 
    def test_circular_constraint(self):
45
 
        a = Table("a", metadata,
 
34
            ForeignKeyConstraint(['emp_id', 'emp_soc'],
 
35
                                ['employees.id', 'employees.soc'])
 
36
            )
 
37
        self.assert_sql_execution(
 
38
            testing.db,
 
39
            lambda: metadata.create_all(checkfirst=False),
 
40
            CompiledSQL('CREATE TABLE employees ('
 
41
                    'id INTEGER, '
 
42
                    'soc VARCHAR(40), '
 
43
                    'name VARCHAR(30), '
 
44
                    'PRIMARY KEY (id, soc)'
 
45
                    ')'
 
46
            ),
 
47
            CompiledSQL('CREATE TABLE elements ('
 
48
                    'id INTEGER, '
 
49
                    'stuff VARCHAR(30), '
 
50
                    'emp_id INTEGER, '
 
51
                    'emp_soc VARCHAR(40), '
 
52
                    'CONSTRAINT elements_primkey PRIMARY KEY (id), '
 
53
                    'FOREIGN KEY(emp_id, emp_soc) '
 
54
                            'REFERENCES employees (id, soc)'
 
55
                ')'
 
56
            )
 
57
        )
 
58
 
 
59
 
 
60
    @testing.provide_metadata
 
61
    def test_cyclic_fk_table_constraint_create(self):
 
62
        metadata = self.metadata
 
63
 
 
64
        Table("a", metadata,
46
65
            Column('id', Integer, primary_key=True),
47
66
            Column('bid', Integer),
48
 
            ForeignKeyConstraint(["bid"], ["b.id"], name="afk")
 
67
            ForeignKeyConstraint(["bid"], ["b.id"])
49
68
            )
50
 
        b = Table("b", metadata,
 
69
        Table("b", metadata,
51
70
            Column('id', Integer, primary_key=True),
52
71
            Column("aid", Integer),
53
72
            ForeignKeyConstraint(["aid"], ["a.id"], use_alter=True, name="bfk")
54
73
            )
55
 
        metadata.create_all()
56
 
 
57
 
    def test_circular_constraint_2(self):
58
 
        a = Table("a", metadata,
 
74
        self._assert_cyclic_constraint(metadata)
 
75
 
 
76
    @testing.provide_metadata
 
77
    def test_cyclic_fk_column_constraint_create(self):
 
78
        metadata = self.metadata
 
79
 
 
80
        Table("a", metadata,
59
81
            Column('id', Integer, primary_key=True),
60
82
            Column('bid', Integer, ForeignKey("b.id")),
61
83
            )
62
 
        b = Table("b", metadata,
 
84
        Table("b", metadata,
63
85
            Column('id', Integer, primary_key=True),
64
 
            Column("aid", Integer, ForeignKey("a.id", use_alter=True, name="bfk")),
65
 
            )
66
 
        metadata.create_all()
67
 
 
68
 
    @testing.fails_on('mysql', 'FIXME: unknown')
69
 
    def test_check_constraint(self):
70
 
        foo = Table('foo', metadata,
 
86
            Column("aid", Integer,
 
87
                ForeignKey("a.id", use_alter=True, name="bfk")
 
88
                ),
 
89
            )
 
90
        self._assert_cyclic_constraint(metadata)
 
91
 
 
92
    def _assert_cyclic_constraint(self, metadata):
 
93
        assertions = [
 
94
            CompiledSQL('CREATE TABLE b ('
 
95
                    'id INTEGER NOT NULL, '
 
96
                    'aid INTEGER, '
 
97
                    'PRIMARY KEY (id)'
 
98
                    ')'
 
99
                    ),
 
100
            CompiledSQL('CREATE TABLE a ('
 
101
                    'id INTEGER NOT NULL, '
 
102
                    'bid INTEGER, '
 
103
                    'PRIMARY KEY (id), '
 
104
                    'FOREIGN KEY(bid) REFERENCES b (id)'
 
105
                    ')'
 
106
                ),
 
107
        ]
 
108
        if testing.db.dialect.supports_alter:
 
109
            assertions.append(
 
110
                CompiledSQL('ALTER TABLE b ADD CONSTRAINT bfk '
 
111
                        'FOREIGN KEY(aid) REFERENCES a (id)')
 
112
            )
 
113
        self.assert_sql_execution(
 
114
            testing.db,
 
115
            lambda: metadata.create_all(checkfirst=False),
 
116
            *assertions
 
117
        )
 
118
 
 
119
        assertions = []
 
120
        if testing.db.dialect.supports_alter:
 
121
            assertions.append(CompiledSQL('ALTER TABLE b DROP CONSTRAINT bfk'))
 
122
        assertions.extend([
 
123
            CompiledSQL("DROP TABLE a"),
 
124
            CompiledSQL("DROP TABLE b"),
 
125
            ])
 
126
        self.assert_sql_execution(
 
127
            testing.db,
 
128
            lambda: metadata.drop_all(checkfirst=False),
 
129
            *assertions
 
130
        )
 
131
 
 
132
    @testing.provide_metadata
 
133
    def test_check_constraint_create(self):
 
134
        metadata = self.metadata
 
135
 
 
136
        Table('foo', metadata,
71
137
            Column('id', Integer, primary_key=True),
72
138
            Column('x', Integer),
73
139
            Column('y', Integer),
74
140
            CheckConstraint('x>y'))
75
 
        bar = Table('bar', metadata,
 
141
        Table('bar', metadata,
76
142
            Column('id', Integer, primary_key=True),
77
143
            Column('x', Integer, CheckConstraint('x>7')),
78
144
            Column('z', Integer)
79
145
            )
80
146
 
81
 
        metadata.create_all()
82
 
        foo.insert().execute(id=1,x=9,y=5)
83
 
        assert_raises(exc.DBAPIError, foo.insert().execute, id=2,x=5,y=9)
84
 
        bar.insert().execute(id=1,x=10)
85
 
        assert_raises(exc.DBAPIError, bar.insert().execute, id=2,x=5)
86
 
 
87
 
    def test_unique_constraint(self):
88
 
        foo = Table('foo', metadata,
 
147
        self.assert_sql_execution(
 
148
            testing.db,
 
149
            lambda: metadata.create_all(checkfirst=False),
 
150
            AllOf(
 
151
                CompiledSQL('CREATE TABLE foo ('
 
152
                        'id INTEGER NOT NULL, '
 
153
                        'x INTEGER, '
 
154
                        'y INTEGER, '
 
155
                        'PRIMARY KEY (id), '
 
156
                        'CHECK (x>y)'
 
157
                        ')'
 
158
                        ),
 
159
                CompiledSQL('CREATE TABLE bar ('
 
160
                        'id INTEGER NOT NULL, '
 
161
                        'x INTEGER CHECK (x>7), '
 
162
                        'z INTEGER, '
 
163
                        'PRIMARY KEY (id)'
 
164
                        ')'
 
165
                )
 
166
            )
 
167
        )
 
168
 
 
169
    @testing.provide_metadata
 
170
    def test_unique_constraint_create(self):
 
171
        metadata = self.metadata
 
172
 
 
173
        Table('foo', metadata,
89
174
            Column('id', Integer, primary_key=True),
90
175
            Column('value', String(30), unique=True))
91
 
        bar = Table('bar', metadata,
 
176
        Table('bar', metadata,
92
177
            Column('id', Integer, primary_key=True),
93
178
            Column('value', String(30)),
94
179
            Column('value2', String(30)),
95
180
            UniqueConstraint('value', 'value2', name='uix1')
96
181
            )
97
 
        metadata.create_all()
98
 
        foo.insert().execute(id=1, value='value1')
99
 
        foo.insert().execute(id=2, value='value2')
100
 
        bar.insert().execute(id=1, value='a', value2='a')
101
 
        bar.insert().execute(id=2, value='a', value2='b')
102
 
        assert_raises(exc.DBAPIError, foo.insert().execute, id=3, value='value1')
103
 
        assert_raises(exc.DBAPIError, bar.insert().execute, id=3, value='a', value2='b')
104
 
 
 
182
 
 
183
        self.assert_sql_execution(
 
184
            testing.db,
 
185
            lambda: metadata.create_all(checkfirst=False),
 
186
            AllOf(
 
187
                CompiledSQL('CREATE TABLE foo ('
 
188
                        'id INTEGER NOT NULL, '
 
189
                        'value VARCHAR(30), '
 
190
                        'PRIMARY KEY (id), '
 
191
                        'UNIQUE (value)'
 
192
                    ')'),
 
193
                CompiledSQL('CREATE TABLE bar ('
 
194
                        'id INTEGER NOT NULL, '
 
195
                        'value VARCHAR(30), '
 
196
                        'value2 VARCHAR(30), '
 
197
                        'PRIMARY KEY (id), '
 
198
                        'CONSTRAINT uix1 UNIQUE (value, value2)'
 
199
                        ')')
 
200
            )
 
201
        )
 
202
 
 
203
    @testing.provide_metadata
105
204
    def test_index_create(self):
 
205
        metadata = self.metadata
 
206
 
106
207
        employees = Table('employees', metadata,
107
208
                          Column('id', Integer, primary_key=True),
108
209
                          Column('first_name', String(30)),
109
210
                          Column('last_name', String(30)),
110
211
                          Column('email_address', String(30)))
111
 
        employees.create()
112
212
 
113
213
        i = Index('employee_name_index',
114
214
                  employees.c.last_name, employees.c.first_name)
115
 
        i.create()
116
215
        assert i in employees.indexes
117
216
 
118
217
        i2 = Index('employee_email_index',
119
218
                   employees.c.email_address, unique=True)
120
 
        i2.create()
121
219
        assert i2 in employees.indexes
122
220
 
 
221
        self.assert_sql_execution(
 
222
            testing.db,
 
223
            lambda: metadata.create_all(checkfirst=False),
 
224
            RegexSQL("^CREATE TABLE"),
 
225
            AllOf(
 
226
                CompiledSQL('CREATE INDEX employee_name_index ON '
 
227
                        'employees (last_name, first_name)', []),
 
228
                CompiledSQL('CREATE UNIQUE INDEX employee_email_index ON '
 
229
                        'employees (email_address)', [])
 
230
            )
 
231
        )
 
232
 
 
233
    @testing.provide_metadata
123
234
    def test_index_create_camelcase(self):
124
235
        """test that mixed-case index identifiers are legal"""
125
236
 
 
237
        metadata = self.metadata
 
238
 
126
239
        employees = Table('companyEmployees', metadata,
127
240
                          Column('id', Integer, primary_key=True),
128
241
                          Column('firstName', String(30)),
129
242
                          Column('lastName', String(30)),
130
243
                          Column('emailAddress', String(30)))
131
244
 
132
 
        employees.create()
133
 
 
134
 
        i = Index('employeeNameIndex',
 
245
 
 
246
 
 
247
        Index('employeeNameIndex',
135
248
                  employees.c.lastName, employees.c.firstName)
136
 
        i.create()
137
249
 
138
 
        i = Index('employeeEmailIndex',
 
250
        Index('employeeEmailIndex',
139
251
                  employees.c.emailAddress, unique=True)
140
 
        i.create()
141
 
 
142
 
        # Check that the table is useable. This is mostly for pg,
143
 
        # which can be somewhat sticky with mixed-case identifiers
144
 
        employees.insert().execute(firstName='Joe', lastName='Smith', id=0)
145
 
        ss = employees.select().execute().fetchall()
146
 
        assert ss[0].firstName == 'Joe'
147
 
        assert ss[0].lastName == 'Smith'
148
 
 
 
252
 
 
253
        self.assert_sql_execution(
 
254
            testing.db,
 
255
            lambda: metadata.create_all(checkfirst=False),
 
256
            RegexSQL("^CREATE TABLE"),
 
257
            AllOf(
 
258
                CompiledSQL('CREATE INDEX "employeeNameIndex" ON '
 
259
                        '"companyEmployees" ("lastName", "firstName")', []),
 
260
                CompiledSQL('CREATE UNIQUE INDEX "employeeEmailIndex" ON '
 
261
                        '"companyEmployees" ("emailAddress")', [])
 
262
            )
 
263
        )
 
264
 
 
265
    @testing.provide_metadata
149
266
    def test_index_create_inline(self):
150
 
        """Test indexes defined with tables"""
 
267
        # test an index create using index=True, unique=True
 
268
 
 
269
        metadata = self.metadata
151
270
 
152
271
        events = Table('events', metadata,
153
272
                       Column('id', Integer, primary_key=True),
157
276
                       Column('announcer', String(30)),
158
277
                       Column('winner', String(30)))
159
278
 
160
 
        Index('sport_announcer', events.c.sport, events.c.announcer, unique=True)
 
279
        Index('sport_announcer', events.c.sport, events.c.announcer,
 
280
                                    unique=True)
161
281
        Index('idx_winners', events.c.winner)
162
282
 
163
283
        eq_(
164
 
            set([ ix.name for ix in events.indexes ]),
165
 
            set(['ix_events_name', 'ix_events_location', 'sport_announcer', 'idx_winners'])
 
284
            set(ix.name for ix in events.indexes),
 
285
            set(['ix_events_name', 'ix_events_location',
 
286
                        'sport_announcer', 'idx_winners'])
166
287
        )
167
288
 
168
289
        self.assert_sql_execution(
170
291
            lambda: events.create(testing.db),
171
292
            RegexSQL("^CREATE TABLE events"),
172
293
            AllOf(
173
 
                ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events (name)'),
174
 
                ExactSQL('CREATE INDEX ix_events_location ON events (location)'),
175
 
                ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)'),
 
294
                ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events '
 
295
                                        '(name)'),
 
296
                ExactSQL('CREATE INDEX ix_events_location ON events '
 
297
                                        '(location)'),
 
298
                ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events '
 
299
                                        '(sport, announcer)'),
176
300
                ExactSQL('CREATE INDEX idx_winners ON events (winner)')
177
301
            )
178
302
        )
179
303
 
180
 
        # verify that the table is functional
181
 
        events.insert().execute(id=1, name='hockey finals', location='rink',
182
 
                                sport='hockey', announcer='some canadian',
183
 
                                winner='sweden')
184
 
        ss = events.select().execute().fetchall()
 
304
    @testing.provide_metadata
 
305
    def test_index_functional_create(self):
 
306
        metadata = self.metadata
 
307
 
 
308
        t = Table('sometable', metadata,
 
309
                Column('id', Integer, primary_key=True),
 
310
                Column('data', String(50))
 
311
            )
 
312
        Index('myindex', t.c.data.desc())
 
313
        self.assert_sql_execution(
 
314
            testing.db,
 
315
            lambda: t.create(testing.db),
 
316
            CompiledSQL('CREATE TABLE sometable (id INTEGER NOT NULL, '
 
317
                            'data VARCHAR(50), PRIMARY KEY (id))'),
 
318
            ExactSQL('CREATE INDEX myindex ON sometable (data DESC)')
 
319
        )
 
320
 
 
321
class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
 
322
    __dialect__ = 'default'
 
323
 
 
324
    def test_create_plain(self):
 
325
        t = Table('t', MetaData(), Column('x', Integer))
 
326
        i = Index("xyz", t.c.x)
 
327
        self.assert_compile(
 
328
            schema.CreateIndex(i),
 
329
            "CREATE INDEX xyz ON t (x)"
 
330
        )
 
331
 
 
332
    def test_drop_plain_unattached(self):
 
333
        self.assert_compile(
 
334
            schema.DropIndex(Index(name="xyz")),
 
335
            "DROP INDEX xyz"
 
336
        )
 
337
 
 
338
    def test_drop_plain(self):
 
339
        self.assert_compile(
 
340
            schema.DropIndex(Index(name="xyz")),
 
341
            "DROP INDEX xyz"
 
342
        )
 
343
 
 
344
    def test_create_schema(self):
 
345
        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
 
346
        i = Index("xyz", t.c.x)
 
347
        self.assert_compile(
 
348
            schema.CreateIndex(i),
 
349
            "CREATE INDEX xyz ON foo.t (x)"
 
350
        )
 
351
 
 
352
    def test_drop_schema(self):
 
353
        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
 
354
        i = Index("xyz", t.c.x)
 
355
        self.assert_compile(
 
356
            schema.DropIndex(i),
 
357
            "DROP INDEX foo.xyz"
 
358
        )
 
359
 
185
360
 
186
361
    def test_too_long_idx_name(self):
187
362
        dialect = testing.db.dialect.__class__()
219
394
            dialect=dialect
220
395
        )
221
396
 
222
 
    def test_index_declartion_inline(self):
 
397
    def test_functional_index(self):
 
398
        metadata = MetaData()
 
399
        x = Table('x', metadata,
 
400
                Column('q', String(50))
 
401
            )
 
402
        idx = Index('y', func.lower(x.c.q))
 
403
 
 
404
        self.assert_compile(
 
405
            schema.CreateIndex(idx),
 
406
            "CREATE INDEX y ON x (lower(q))"
 
407
        )
 
408
 
 
409
        self.assert_compile(
 
410
            schema.CreateIndex(idx),
 
411
            "CREATE INDEX y ON x (lower(q))",
 
412
            dialect=testing.db.dialect
 
413
        )
 
414
 
 
415
    def test_index_declaration_inline(self):
 
416
        metadata = MetaData()
 
417
 
223
418
        t1 = Table('t1', metadata,
224
419
            Column('x', Integer),
225
420
            Column('y', Integer),
230
425
            "CREATE INDEX foo ON t1 (x, y)"
231
426
        )
232
427
 
233
 
    def test_index_asserts_cols_standalone(self):
234
 
        t1 = Table('t1', metadata,
235
 
            Column('x', Integer)
236
 
        )
237
 
        t2 = Table('t2', metadata,
238
 
            Column('y', Integer)
239
 
        )
240
 
        assert_raises_message(
241
 
            exc.ArgumentError,
242
 
            "Column 't2.y' is not part of table 't1'.",
243
 
            Index,
244
 
            "bar", t1.c.x, t2.c.y
245
 
        )
246
 
 
247
 
    def test_index_asserts_cols_inline(self):
248
 
        t1 = Table('t1', metadata,
249
 
            Column('x', Integer)
250
 
        )
251
 
        assert_raises_message(
252
 
            exc.ArgumentError,
253
 
            "Index 'bar' is against table 't1', and "
254
 
            "cannot be associated with table 't2'.",
255
 
            Table, 't2', metadata,
256
 
                Column('y', Integer),
257
 
                Index('bar', t1.c.x)
258
 
        )
259
 
 
260
 
    def test_raise_index_nonexistent_name(self):
261
 
        m = MetaData()
262
 
        # the KeyError isn't ideal here, a nicer message
263
 
        # perhaps
264
 
        assert_raises(
265
 
            KeyError,
266
 
            Table, 't', m, Column('x', Integer), Index("foo", "q")
267
 
        )
268
 
 
269
 
    def test_raise_not_a_column(self):
270
 
        assert_raises(
271
 
            exc.ArgumentError,
272
 
            Index, "foo", 5
273
 
        )
274
 
 
275
 
    def test_no_warning_w_no_columns(self):
276
 
        Index(name="foo")
277
 
 
278
 
    def test_raise_clauseelement_not_a_column(self):
279
 
        m = MetaData()
280
 
        t2 = Table('t2', m, Column('x', Integer))
281
 
        class SomeClass(object):
282
 
            def __clause_element__(self):
283
 
                return t2
284
 
        assert_raises(
285
 
            exc.ArgumentError,
286
 
            Index, "foo", SomeClass()
287
 
        )
288
 
 
289
 
    def test_create_plain(self):
290
 
        t = Table('t', MetaData(), Column('x', Integer))
291
 
        i = Index("xyz", t.c.x)
292
 
        self.assert_compile(
293
 
            schema.CreateIndex(i),
294
 
            "CREATE INDEX xyz ON t (x)"
295
 
        )
296
 
 
297
 
    def test_drop_plain_unattached(self):
298
 
        self.assert_compile(
299
 
            schema.DropIndex(Index(name="xyz")),
300
 
            "DROP INDEX xyz"
301
 
        )
302
 
 
303
 
    def test_drop_plain(self):
304
 
        t = Table('t', MetaData(), Column('x', Integer))
305
 
        i = Index("xyz", t.c.x)
306
 
        self.assert_compile(
307
 
            schema.DropIndex(Index(name="xyz")),
308
 
            "DROP INDEX xyz"
309
 
        )
310
 
 
311
 
    def test_create_schema(self):
312
 
        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
313
 
        i = Index("xyz", t.c.x)
314
 
        self.assert_compile(
315
 
            schema.CreateIndex(i),
316
 
            "CREATE INDEX xyz ON foo.t (x)"
317
 
        )
318
 
 
319
 
    def test_drop_schema(self):
320
 
        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
321
 
        i = Index("xyz", t.c.x)
322
 
        self.assert_compile(
323
 
            schema.DropIndex(i),
324
 
            "DROP INDEX foo.xyz"
325
 
        )
326
 
 
327
 
class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
328
 
    __dialect__ = 'default'
329
 
 
330
428
    def _test_deferrable(self, constraint_factory):
 
429
        dialect = default.DefaultDialect()
 
430
 
331
431
        t = Table('tbl', MetaData(),
332
432
                  Column('a', Integer),
333
433
                  Column('b', Integer),
334
434
                  constraint_factory(deferrable=True))
335
435
 
336
 
        sql = str(schema.CreateTable(t).compile(bind=testing.db))
 
436
        sql = str(schema.CreateTable(t).compile(dialect=dialect))
337
437
        assert 'DEFERRABLE' in sql, sql
338
438
        assert 'NOT DEFERRABLE' not in sql, sql
339
439
 
342
442
                  Column('b', Integer),
343
443
                  constraint_factory(deferrable=False))
344
444
 
345
 
        sql = str(schema.CreateTable(t).compile(bind=testing.db))
 
445
        sql = str(schema.CreateTable(t).compile(dialect=dialect))
346
446
        assert 'NOT DEFERRABLE' in sql
347
447
 
348
448
 
350
450
                  Column('a', Integer),
351
451
                  Column('b', Integer),
352
452
                  constraint_factory(deferrable=True, initially='IMMEDIATE'))
353
 
        sql = str(schema.CreateTable(t).compile(bind=testing.db))
 
453
        sql = str(schema.CreateTable(t).compile(dialect=dialect))
354
454
        assert 'NOT DEFERRABLE' not in sql
355
455
        assert 'INITIALLY IMMEDIATE' in sql
356
456
 
358
458
                  Column('a', Integer),
359
459
                  Column('b', Integer),
360
460
                  constraint_factory(deferrable=True, initially='DEFERRED'))
361
 
        sql = str(schema.CreateTable(t).compile(bind=testing.db))
 
461
        sql = str(schema.CreateTable(t).compile(dialect=dialect))
362
462
 
363
463
        assert 'NOT DEFERRABLE' not in sql
364
464
        assert 'INITIALLY DEFERRED' in sql
365
465
 
366
466
    def test_column_level_ck_name(self):
367
467
        t = Table('tbl', MetaData(),
368
 
            Column('a', Integer, CheckConstraint("a > 5", name="ck_a_greater_five"))
 
468
            Column('a', Integer, CheckConstraint("a > 5",
 
469
                                name="ck_a_greater_five"))
369
470
        )
370
471
        self.assert_compile(
371
472
            schema.CreateTable(t),
394
495
            "(a) DEFERRABLE INITIALLY DEFERRED)",
395
496
        )
396
497
 
 
498
    def test_fk_match_clause(self):
 
499
        t = Table('tbl', MetaData(),
 
500
                  Column('a', Integer),
 
501
                  Column('b', Integer,
 
502
                         ForeignKey('tbl.a', match="SIMPLE")))
 
503
 
 
504
        self.assert_compile(
 
505
            schema.CreateTable(t),
 
506
            "CREATE TABLE tbl (a INTEGER, b INTEGER, "
 
507
            "FOREIGN KEY(b) REFERENCES tbl "
 
508
            "(a) MATCH SIMPLE)",
 
509
        )
 
510
 
 
511
        self.assert_compile(
 
512
            schema.AddConstraint(list(t.foreign_keys)[0].constraint),
 
513
            "ALTER TABLE tbl ADD FOREIGN KEY(b) "
 
514
            "REFERENCES tbl (a) MATCH SIMPLE"
 
515
        )
 
516
 
397
517
    def test_deferrable_unique(self):
398
518
        factory = lambda **kw: UniqueConstraint('b', **kw)
399
519
        self._test_deferrable(factory)
404
524
 
405
525
    def test_multiple(self):
406
526
        m = MetaData()
407
 
        foo = Table("foo", m,
 
527
        Table("foo", m,
408
528
            Column('id', Integer, primary_key=True),
409
529
            Column('bar', Integer, primary_key=True)
410
530
        )
434
554
 
435
555
        self.assert_compile(
436
556
            schema.CreateTable(t),
437
 
            "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) DEFERRABLE INITIALLY DEFERRED)"
 
557
            "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) "
 
558
                "DEFERRABLE INITIALLY DEFERRED)"
438
559
        )
439
560
 
440
561
    def test_use_alter(self):
441
562
        m = MetaData()
442
 
        t = Table('t', m,
 
563
        Table('t', m,
443
564
                  Column('a', Integer),
444
565
        )
445
566
 
446
 
        t2 = Table('t2', m,
447
 
                Column('a', Integer, ForeignKey('t.a', use_alter=True, name='fk_ta')),
448
 
                Column('b', Integer, ForeignKey('t.a', name='fk_tb')), # to ensure create ordering ...
 
567
        Table('t2', m,
 
568
                Column('a', Integer, ForeignKey('t.a', use_alter=True,
 
569
                                                        name='fk_ta')),
 
570
                Column('b', Integer, ForeignKey('t.a', name='fk_tb'))
449
571
        )
450
572
 
451
573
        e = engines.mock_engine(dialect_name='postgresql')
454
576
 
455
577
        e.assert_sql([
456
578
            'CREATE TABLE t (a INTEGER)',
457
 
            'CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb FOREIGN KEY(b) REFERENCES t (a))',
458
 
            'ALTER TABLE t2 ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)',
 
579
            'CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb '
 
580
                            'FOREIGN KEY(b) REFERENCES t (a))',
 
581
            'ALTER TABLE t2 '
 
582
                    'ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)',
459
583
            'ALTER TABLE t2 DROP CONSTRAINT fk_ta',
460
584
            'DROP TABLE t2',
461
585
            'DROP TABLE t'
462
586
        ])
463
587
 
464
 
 
465
 
    def test_add_drop_constraint(self):
 
588
    def _constraint_create_fixture(self):
466
589
        m = MetaData()
467
590
 
468
591
        t = Table('tbl', m,
475
598
                Column('b', Integer)
476
599
        )
477
600
 
478
 
        constraint = CheckConstraint('a < b',name="my_test_constraint",
479
 
                                        deferrable=True,initially='DEFERRED', table=t)
480
 
 
 
601
        return t, t2
 
602
 
 
603
    def test_render_ck_constraint_inline(self):
 
604
        t, t2 = self._constraint_create_fixture()
 
605
 
 
606
        CheckConstraint('a < b', name="my_test_constraint",
 
607
                                        deferrable=True, initially='DEFERRED',
 
608
                                        table=t)
481
609
 
482
610
        # before we create an AddConstraint,
483
611
        # the CONSTRAINT comes out inline
486
614
            "CREATE TABLE tbl ("
487
615
            "a INTEGER, "
488
616
            "b INTEGER, "
489
 
            "CONSTRAINT my_test_constraint CHECK (a < b) DEFERRABLE INITIALLY DEFERRED"
 
617
            "CONSTRAINT my_test_constraint CHECK (a < b) "
 
618
                        "DEFERRABLE INITIALLY DEFERRED"
490
619
            ")"
491
620
        )
492
621
 
 
622
    def test_render_ck_constraint_external(self):
 
623
        t, t2 = self._constraint_create_fixture()
 
624
 
 
625
        constraint = CheckConstraint('a < b', name="my_test_constraint",
 
626
                                        deferrable=True, initially='DEFERRED',
 
627
                                        table=t)
 
628
 
493
629
        self.assert_compile(
494
630
            schema.AddConstraint(constraint),
495
631
            "ALTER TABLE tbl ADD CONSTRAINT my_test_constraint "
496
632
                    "CHECK (a < b) DEFERRABLE INITIALLY DEFERRED"
497
633
        )
498
634
 
 
635
    def test_external_ck_constraint_cancels_internal(self):
 
636
        t, t2 = self._constraint_create_fixture()
 
637
 
 
638
        constraint = CheckConstraint('a < b', name="my_test_constraint",
 
639
                                        deferrable=True, initially='DEFERRED',
 
640
                                        table=t)
 
641
 
 
642
        schema.AddConstraint(constraint)
 
643
 
499
644
        # once we make an AddConstraint,
500
645
        # inline compilation of the CONSTRAINT
501
646
        # is disabled
507
652
            ")"
508
653
        )
509
654
 
 
655
    def test_render_drop_constraint(self):
 
656
        t, t2 = self._constraint_create_fixture()
 
657
 
 
658
        constraint = CheckConstraint('a < b', name="my_test_constraint",
 
659
                                        deferrable=True, initially='DEFERRED',
 
660
                                        table=t)
 
661
 
510
662
        self.assert_compile(
511
663
            schema.DropConstraint(constraint),
512
664
            "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint"
513
665
        )
514
666
 
 
667
    def test_render_drop_constraint_cascade(self):
 
668
        t, t2 = self._constraint_create_fixture()
 
669
 
 
670
        constraint = CheckConstraint('a < b', name="my_test_constraint",
 
671
                                        deferrable=True, initially='DEFERRED',
 
672
                                        table=t)
 
673
 
515
674
        self.assert_compile(
516
675
            schema.DropConstraint(constraint, cascade=True),
517
676
            "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint CASCADE"
518
677
        )
519
678
 
 
679
    def test_render_add_fk_constraint_stringcol(self):
 
680
        t, t2 = self._constraint_create_fixture()
 
681
 
520
682
        constraint = ForeignKeyConstraint(["b"], ["t2.a"])
521
683
        t.append_constraint(constraint)
522
684
        self.assert_compile(
524
686
            "ALTER TABLE tbl ADD FOREIGN KEY(b) REFERENCES t2 (a)"
525
687
        )
526
688
 
 
689
    def test_render_add_fk_constraint_realcol(self):
 
690
        t, t2 = self._constraint_create_fixture()
 
691
 
527
692
        constraint = ForeignKeyConstraint([t.c.a], [t2.c.b])
528
693
        t.append_constraint(constraint)
529
694
        self.assert_compile(
531
696
            "ALTER TABLE tbl ADD FOREIGN KEY(a) REFERENCES t2 (b)"
532
697
        )
533
698
 
 
699
    def test_render_add_uq_constraint_stringcol(self):
 
700
        t, t2 = self._constraint_create_fixture()
 
701
 
534
702
        constraint = UniqueConstraint("a", "b", name="uq_cst")
535
703
        t2.append_constraint(constraint)
536
704
        self.assert_compile(
538
706
            "ALTER TABLE t2 ADD CONSTRAINT uq_cst UNIQUE (a, b)"
539
707
        )
540
708
 
 
709
    def test_render_add_uq_constraint_realcol(self):
 
710
        t, t2 = self._constraint_create_fixture()
 
711
 
541
712
        constraint = UniqueConstraint(t2.c.a, t2.c.b, name="uq_cs2")
542
713
        self.assert_compile(
543
714
            schema.AddConstraint(constraint),
544
715
            "ALTER TABLE t2 ADD CONSTRAINT uq_cs2 UNIQUE (a, b)"
545
716
        )
546
717
 
 
718
    def test_render_add_pk_constraint(self):
 
719
        t, t2 = self._constraint_create_fixture()
 
720
 
547
721
        assert t.c.a.primary_key is False
548
722
        constraint = PrimaryKeyConstraint(t.c.a)
549
723
        assert t.c.a.primary_key is True
552
726
            "ALTER TABLE tbl ADD PRIMARY KEY (a)"
553
727
        )
554
728
 
555
 
 
 
729
class ConstraintAPITest(fixtures.TestBase):
 
730
    def test_double_fk_usage_raises(self):
 
731
        f = ForeignKey('b.id')
 
732
 
 
733
        Column('x', Integer, f)
 
734
        assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
 
735
 
 
736
    def test_auto_append_constraint(self):
 
737
        m = MetaData()
 
738
 
 
739
        t = Table('tbl', m,
 
740
                  Column('a', Integer),
 
741
                  Column('b', Integer)
 
742
        )
 
743
 
 
744
        t2 = Table('t2', m,
 
745
                Column('a', Integer),
 
746
                Column('b', Integer)
 
747
        )
 
748
 
 
749
        for c in (
 
750
            UniqueConstraint(t.c.a),
 
751
            CheckConstraint(t.c.a > 5),
 
752
            ForeignKeyConstraint([t.c.a], [t2.c.a]),
 
753
            PrimaryKeyConstraint(t.c.a)
 
754
        ):
 
755
            assert c in t.constraints
 
756
            t.append_constraint(c)
 
757
            assert c in t.constraints
 
758
 
 
759
        c = Index('foo', t.c.a)
 
760
        assert c in t.indexes
 
761
 
 
762
    def test_auto_append_lowercase_table(self):
 
763
        t = table('t', column('a'))
 
764
        t2 = table('t2', column('a'))
 
765
        for c in (
 
766
            UniqueConstraint(t.c.a),
 
767
            CheckConstraint(t.c.a > 5),
 
768
            ForeignKeyConstraint([t.c.a], [t2.c.a]),
 
769
            PrimaryKeyConstraint(t.c.a),
 
770
            Index('foo', t.c.a)
 
771
        ):
 
772
            assert True
 
773
 
 
774
    def test_tometadata_ok(self):
 
775
        m = MetaData()
 
776
 
 
777
        t = Table('tbl', m,
 
778
                  Column('a', Integer),
 
779
                  Column('b', Integer)
 
780
        )
 
781
 
 
782
        t2 = Table('t2', m,
 
783
                Column('a', Integer),
 
784
                Column('b', Integer)
 
785
        )
 
786
 
 
787
        UniqueConstraint(t.c.a)
 
788
        CheckConstraint(t.c.a > 5)
 
789
        ForeignKeyConstraint([t.c.a], [t2.c.a])
 
790
        PrimaryKeyConstraint(t.c.a)
 
791
 
 
792
        m2 = MetaData()
 
793
 
 
794
        t3 = t.tometadata(m2)
 
795
 
 
796
        eq_(len(t3.constraints), 4)
 
797
 
 
798
        for c in t3.constraints:
 
799
            assert c.table is t3
 
800
 
 
801
    def test_check_constraint_copy(self):
 
802
        m = MetaData()
 
803
        t = Table('tbl', m,
 
804
                  Column('a', Integer),
 
805
                  Column('b', Integer)
 
806
        )
 
807
        ck = CheckConstraint(t.c.a > 5)
 
808
        ck2 = ck.copy()
 
809
        assert ck in t.constraints
 
810
        assert ck2 not in t.constraints
 
811
 
 
812
 
 
813
    def test_ambig_check_constraint_auto_append(self):
 
814
        m = MetaData()
 
815
 
 
816
        t = Table('tbl', m,
 
817
                  Column('a', Integer),
 
818
                  Column('b', Integer)
 
819
        )
 
820
 
 
821
        t2 = Table('t2', m,
 
822
                Column('a', Integer),
 
823
                Column('b', Integer)
 
824
        )
 
825
        c = CheckConstraint(t.c.a > t2.c.b)
 
826
        assert c not in t.constraints
 
827
        assert c not in t2.constraints
 
828
 
 
829
    def test_index_asserts_cols_standalone(self):
 
830
        metadata = MetaData()
 
831
 
 
832
        t1 = Table('t1', metadata,
 
833
            Column('x', Integer)
 
834
        )
 
835
        t2 = Table('t2', metadata,
 
836
            Column('y', Integer)
 
837
        )
 
838
        assert_raises_message(
 
839
            exc.ArgumentError,
 
840
            "Column 't2.y' is not part of table 't1'.",
 
841
            Index,
 
842
            "bar", t1.c.x, t2.c.y
 
843
        )
 
844
 
 
845
    def test_index_asserts_cols_inline(self):
 
846
        metadata = MetaData()
 
847
 
 
848
        t1 = Table('t1', metadata,
 
849
            Column('x', Integer)
 
850
        )
 
851
        assert_raises_message(
 
852
            exc.ArgumentError,
 
853
            "Index 'bar' is against table 't1', and "
 
854
            "cannot be associated with table 't2'.",
 
855
            Table, 't2', metadata,
 
856
                Column('y', Integer),
 
857
                Index('bar', t1.c.x)
 
858
        )
 
859
 
 
860
    def test_raise_index_nonexistent_name(self):
 
861
        m = MetaData()
 
862
        # the KeyError isn't ideal here, a nicer message
 
863
        # perhaps
 
864
        assert_raises(
 
865
            KeyError,
 
866
            Table, 't', m, Column('x', Integer), Index("foo", "q")
 
867
        )
 
868
 
 
869
    def test_raise_not_a_column(self):
 
870
        assert_raises(
 
871
            exc.ArgumentError,
 
872
            Index, "foo", 5
 
873
        )
 
874
 
 
875
    def test_raise_expr_no_column(self):
 
876
        idx = Index('foo', func.lower(5))
 
877
 
 
878
        assert_raises_message(
 
879
            exc.CompileError,
 
880
            "Index 'foo' is not associated with any table.",
 
881
            schema.CreateIndex(idx).compile, dialect=testing.db.dialect
 
882
        )
 
883
        assert_raises_message(
 
884
            exc.CompileError,
 
885
            "Index 'foo' is not associated with any table.",
 
886
            schema.CreateIndex(idx).compile
 
887
        )
 
888
 
 
889
 
 
890
    def test_no_warning_w_no_columns(self):
 
891
        # I think the test here is, there is no warning.
 
892
        # people want to create empty indexes for the purpose of
 
893
        # a drop.
 
894
        idx = Index(name="foo")
 
895
 
 
896
        assert_raises_message(
 
897
            exc.CompileError,
 
898
            "Index 'foo' is not associated with any table.",
 
899
            schema.CreateIndex(idx).compile, dialect=testing.db.dialect
 
900
        )
 
901
        assert_raises_message(
 
902
            exc.CompileError,
 
903
            "Index 'foo' is not associated with any table.",
 
904
            schema.CreateIndex(idx).compile
 
905
        )
 
906
 
 
907
    def test_raise_clauseelement_not_a_column(self):
 
908
        m = MetaData()
 
909
        t2 = Table('t2', m, Column('x', Integer))
 
910
        class SomeClass(object):
 
911
            def __clause_element__(self):
 
912
                return t2
 
913
        assert_raises(
 
914
            exc.ArgumentError,
 
915
            Index, "foo", SomeClass()
 
916
        )