~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to lib/sqlalchemy/testing/suite/test_reflection.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from __future__ import with_statement
 
2
 
 
3
import sqlalchemy as sa
 
4
from sqlalchemy import exc as sa_exc
 
5
from sqlalchemy import types as sql_types
 
6
from sqlalchemy import inspect
 
7
from sqlalchemy import MetaData, Integer, String
 
8
from sqlalchemy.engine.reflection import Inspector
 
9
from sqlalchemy.testing import engines, fixtures
 
10
from sqlalchemy.testing.schema import Table, Column
 
11
from sqlalchemy.testing import eq_, assert_raises_message
 
12
from sqlalchemy import testing
 
13
from .. import config
 
14
 
 
15
from sqlalchemy.schema import DDL, Index
 
16
from sqlalchemy import event
 
17
 
 
18
metadata, users = None, None
 
19
 
 
20
 
 
21
class HasTableTest(fixtures.TablesTest):
 
22
    @classmethod
 
23
    def define_tables(cls, metadata):
 
24
        Table('test_table', metadata,
 
25
                Column('id', Integer, primary_key=True),
 
26
                Column('data', String(50))
 
27
            )
 
28
 
 
29
    def test_has_table(self):
 
30
        with config.db.begin() as conn:
 
31
            assert config.db.dialect.has_table(conn, "test_table")
 
32
            assert not config.db.dialect.has_table(conn, "nonexistent_table")
 
33
 
 
34
 
 
35
 
 
36
class ComponentReflectionTest(fixtures.TablesTest):
 
37
    run_inserts = run_deletes = None
 
38
 
 
39
    @classmethod
 
40
    def define_tables(cls, metadata):
 
41
        cls.define_reflected_tables(metadata, None)
 
42
        if testing.requires.schemas.enabled:
 
43
            cls.define_reflected_tables(metadata, "test_schema")
 
44
 
 
45
    @classmethod
 
46
    def define_reflected_tables(cls, metadata, schema):
 
47
        if schema:
 
48
            schema_prefix = schema + "."
 
49
        else:
 
50
            schema_prefix = ""
 
51
 
 
52
        if testing.requires.self_referential_foreign_keys.enabled:
 
53
            users = Table('users', metadata,
 
54
                Column('user_id', sa.INT, primary_key=True),
 
55
                Column('test1', sa.CHAR(5), nullable=False),
 
56
                Column('test2', sa.Float(5), nullable=False),
 
57
                Column('parent_user_id', sa.Integer,
 
58
                            sa.ForeignKey('%susers.user_id' % schema_prefix)),
 
59
                schema=schema,
 
60
                test_needs_fk=True,
 
61
            )
 
62
        else:
 
63
            users = Table('users', metadata,
 
64
                Column('user_id', sa.INT, primary_key=True),
 
65
                Column('test1', sa.CHAR(5), nullable=False),
 
66
                Column('test2', sa.Float(5), nullable=False),
 
67
                schema=schema,
 
68
                test_needs_fk=True,
 
69
            )
 
70
 
 
71
        Table("dingalings", metadata,
 
72
                  Column('dingaling_id', sa.Integer, primary_key=True),
 
73
                  Column('address_id', sa.Integer,
 
74
                    sa.ForeignKey('%semail_addresses.address_id' %
 
75
                                    schema_prefix)),
 
76
                  Column('data', sa.String(30)),
 
77
                  schema=schema,
 
78
                  test_needs_fk=True,
 
79
            )
 
80
        Table('email_addresses', metadata,
 
81
            Column('address_id', sa.Integer),
 
82
            Column('remote_user_id', sa.Integer,
 
83
                   sa.ForeignKey(users.c.user_id)),
 
84
            Column('email_address', sa.String(20)),
 
85
            sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
 
86
            schema=schema,
 
87
            test_needs_fk=True,
 
88
        )
 
89
 
 
90
        if testing.requires.index_reflection.enabled:
 
91
            cls.define_index(metadata, users)
 
92
        if testing.requires.view_reflection.enabled:
 
93
            cls.define_views(metadata, schema)
 
94
 
 
95
    @classmethod
 
96
    def define_index(cls, metadata, users):
 
97
        Index("users_t_idx", users.c.test1, users.c.test2)
 
98
        Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
 
99
 
 
100
    @classmethod
 
101
    def define_views(cls, metadata, schema):
 
102
        for table_name in ('users', 'email_addresses'):
 
103
            fullname = table_name
 
104
            if schema:
 
105
                fullname = "%s.%s" % (schema, table_name)
 
106
            view_name = fullname + '_v'
 
107
            query = "CREATE VIEW %s AS SELECT * FROM %s" % (
 
108
                                view_name, fullname)
 
109
 
 
110
            event.listen(
 
111
                metadata,
 
112
                "after_create",
 
113
                DDL(query)
 
114
            )
 
115
            event.listen(
 
116
                metadata,
 
117
                "before_drop",
 
118
                DDL("DROP VIEW %s" % view_name)
 
119
            )
 
120
 
 
121
    @testing.requires.schema_reflection
 
122
    def test_get_schema_names(self):
 
123
        insp = inspect(testing.db)
 
124
 
 
125
        self.assert_('test_schema' in insp.get_schema_names())
 
126
 
 
127
    @testing.requires.schema_reflection
 
128
    def test_dialect_initialize(self):
 
129
        engine = engines.testing_engine()
 
130
        assert not hasattr(engine.dialect, 'default_schema_name')
 
131
        inspect(engine)
 
132
        assert hasattr(engine.dialect, 'default_schema_name')
 
133
 
 
134
    @testing.requires.schema_reflection
 
135
    def test_get_default_schema_name(self):
 
136
        insp = inspect(testing.db)
 
137
        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
 
138
 
 
139
    @testing.provide_metadata
 
140
    def _test_get_table_names(self, schema=None, table_type='table',
 
141
                              order_by=None):
 
142
        meta = self.metadata
 
143
        users, addresses, dingalings = self.tables.users, \
 
144
                self.tables.email_addresses, self.tables.dingalings
 
145
        insp = inspect(meta.bind)
 
146
        if table_type == 'view':
 
147
            table_names = insp.get_view_names(schema)
 
148
            table_names.sort()
 
149
            answer = ['email_addresses_v', 'users_v']
 
150
        else:
 
151
            table_names = insp.get_table_names(schema,
 
152
                                               order_by=order_by)
 
153
            if order_by == 'foreign_key':
 
154
                answer = ['users', 'email_addresses', 'dingalings']
 
155
                eq_(table_names, answer)
 
156
            else:
 
157
                answer = ['dingalings', 'email_addresses', 'users']
 
158
                eq_(sorted(table_names), answer)
 
159
 
 
160
    @testing.requires.table_reflection
 
161
    def test_get_table_names(self):
 
162
        self._test_get_table_names()
 
163
 
 
164
    @testing.requires.table_reflection
 
165
    @testing.requires.foreign_key_constraint_reflection
 
166
    def test_get_table_names_fks(self):
 
167
        self._test_get_table_names(order_by='foreign_key')
 
168
 
 
169
    @testing.requires.table_reflection
 
170
    @testing.requires.schemas
 
171
    def test_get_table_names_with_schema(self):
 
172
        self._test_get_table_names('test_schema')
 
173
 
 
174
    @testing.requires.view_reflection
 
175
    def test_get_view_names(self):
 
176
        self._test_get_table_names(table_type='view')
 
177
 
 
178
    @testing.requires.view_reflection
 
179
    @testing.requires.schemas
 
180
    def test_get_view_names_with_schema(self):
 
181
        self._test_get_table_names('test_schema', table_type='view')
 
182
 
 
183
    def _test_get_columns(self, schema=None, table_type='table'):
 
184
        meta = MetaData(testing.db)
 
185
        users, addresses, dingalings = self.tables.users, \
 
186
                self.tables.email_addresses, self.tables.dingalings
 
187
        table_names = ['users', 'email_addresses']
 
188
        if table_type == 'view':
 
189
            table_names = ['users_v', 'email_addresses_v']
 
190
        insp = inspect(meta.bind)
 
191
        for table_name, table in zip(table_names, (users,
 
192
                addresses)):
 
193
            schema_name = schema
 
194
            cols = insp.get_columns(table_name, schema=schema_name)
 
195
            self.assert_(len(cols) > 0, len(cols))
 
196
 
 
197
            # should be in order
 
198
 
 
199
            for i, col in enumerate(table.columns):
 
200
                eq_(col.name, cols[i]['name'])
 
201
                ctype = cols[i]['type'].__class__
 
202
                ctype_def = col.type
 
203
                if isinstance(ctype_def, sa.types.TypeEngine):
 
204
                    ctype_def = ctype_def.__class__
 
205
 
 
206
                # Oracle returns Date for DateTime.
 
207
 
 
208
                if testing.against('oracle') and ctype_def \
 
209
                    in (sql_types.Date, sql_types.DateTime):
 
210
                    ctype_def = sql_types.Date
 
211
 
 
212
                # assert that the desired type and return type share
 
213
                # a base within one of the generic types.
 
214
 
 
215
                self.assert_(len(set(ctype.__mro__).
 
216
                    intersection(ctype_def.__mro__).intersection([
 
217
                    sql_types.Integer,
 
218
                    sql_types.Numeric,
 
219
                    sql_types.DateTime,
 
220
                    sql_types.Date,
 
221
                    sql_types.Time,
 
222
                    sql_types.String,
 
223
                    sql_types._Binary,
 
224
                    ])) > 0, '%s(%s), %s(%s)' % (col.name,
 
225
                            col.type, cols[i]['name'], ctype))
 
226
 
 
227
                if not col.primary_key:
 
228
                    assert cols[i]['default'] is None
 
229
 
 
230
    @testing.requires.table_reflection
 
231
    def test_get_columns(self):
 
232
        self._test_get_columns()
 
233
 
 
234
    @testing.requires.table_reflection
 
235
    @testing.requires.schemas
 
236
    def test_get_columns_with_schema(self):
 
237
        self._test_get_columns(schema='test_schema')
 
238
 
 
239
    @testing.requires.view_reflection
 
240
    def test_get_view_columns(self):
 
241
        self._test_get_columns(table_type='view')
 
242
 
 
243
    @testing.requires.view_reflection
 
244
    @testing.requires.schemas
 
245
    def test_get_view_columns_with_schema(self):
 
246
        self._test_get_columns(schema='test_schema', table_type='view')
 
247
 
 
248
    @testing.provide_metadata
 
249
    def _test_get_pk_constraint(self, schema=None):
 
250
        meta = self.metadata
 
251
        users, addresses = self.tables.users, self.tables.email_addresses
 
252
        insp = inspect(meta.bind)
 
253
 
 
254
        users_cons = insp.get_pk_constraint(users.name, schema=schema)
 
255
        users_pkeys = users_cons['constrained_columns']
 
256
        eq_(users_pkeys,  ['user_id'])
 
257
 
 
258
        addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
 
259
        addr_pkeys = addr_cons['constrained_columns']
 
260
        eq_(addr_pkeys,  ['address_id'])
 
261
 
 
262
        with testing.requires.reflects_pk_names.fail_if():
 
263
            eq_(addr_cons['name'], 'email_ad_pk')
 
264
 
 
265
    @testing.requires.primary_key_constraint_reflection
 
266
    def test_get_pk_constraint(self):
 
267
        self._test_get_pk_constraint()
 
268
 
 
269
    @testing.requires.table_reflection
 
270
    @testing.requires.primary_key_constraint_reflection
 
271
    @testing.requires.schemas
 
272
    def test_get_pk_constraint_with_schema(self):
 
273
        self._test_get_pk_constraint(schema='test_schema')
 
274
 
 
275
    @testing.requires.table_reflection
 
276
    @testing.provide_metadata
 
277
    def test_deprecated_get_primary_keys(self):
 
278
        meta = self.metadata
 
279
        users = self.tables.users
 
280
        insp = Inspector(meta.bind)
 
281
        assert_raises_message(
 
282
            sa_exc.SADeprecationWarning,
 
283
            "Call to deprecated method get_primary_keys."
 
284
            "  Use get_pk_constraint instead.",
 
285
            insp.get_primary_keys, users.name
 
286
        )
 
287
 
 
288
    @testing.provide_metadata
 
289
    def _test_get_foreign_keys(self, schema=None):
 
290
        meta = self.metadata
 
291
        users, addresses, dingalings = self.tables.users, \
 
292
                    self.tables.email_addresses, self.tables.dingalings
 
293
        insp = inspect(meta.bind)
 
294
        expected_schema = schema
 
295
        # users
 
296
        users_fkeys = insp.get_foreign_keys(users.name,
 
297
                                            schema=schema)
 
298
        fkey1 = users_fkeys[0]
 
299
 
 
300
        with testing.requires.named_constraints.fail_if():
 
301
            self.assert_(fkey1['name'] is not None)
 
302
 
 
303
        eq_(fkey1['referred_schema'], expected_schema)
 
304
        eq_(fkey1['referred_table'], users.name)
 
305
        eq_(fkey1['referred_columns'], ['user_id', ])
 
306
        if testing.requires.self_referential_foreign_keys.enabled:
 
307
            eq_(fkey1['constrained_columns'], ['parent_user_id'])
 
308
 
 
309
        #addresses
 
310
        addr_fkeys = insp.get_foreign_keys(addresses.name,
 
311
                                           schema=schema)
 
312
        fkey1 = addr_fkeys[0]
 
313
 
 
314
        with testing.requires.named_constraints.fail_if():
 
315
            self.assert_(fkey1['name'] is not None)
 
316
 
 
317
        eq_(fkey1['referred_schema'], expected_schema)
 
318
        eq_(fkey1['referred_table'], users.name)
 
319
        eq_(fkey1['referred_columns'], ['user_id', ])
 
320
        eq_(fkey1['constrained_columns'], ['remote_user_id'])
 
321
 
 
322
    @testing.requires.foreign_key_constraint_reflection
 
323
    def test_get_foreign_keys(self):
 
324
        self._test_get_foreign_keys()
 
325
 
 
326
    @testing.requires.foreign_key_constraint_reflection
 
327
    @testing.requires.schemas
 
328
    def test_get_foreign_keys_with_schema(self):
 
329
        self._test_get_foreign_keys(schema='test_schema')
 
330
 
 
331
    @testing.provide_metadata
 
332
    def _test_get_indexes(self, schema=None):
 
333
        meta = self.metadata
 
334
        users, addresses, dingalings = self.tables.users, \
 
335
                    self.tables.email_addresses, self.tables.dingalings
 
336
        # The database may decide to create indexes for foreign keys, etc.
 
337
        # so there may be more indexes than expected.
 
338
        insp = inspect(meta.bind)
 
339
        indexes = insp.get_indexes('users', schema=schema)
 
340
        expected_indexes = [
 
341
            {'unique': False,
 
342
             'column_names': ['test1', 'test2'],
 
343
             'name': 'users_t_idx'},
 
344
            {'unique': False,
 
345
             'column_names': ['user_id', 'test2', 'test1'],
 
346
             'name': 'users_all_idx'}
 
347
        ]
 
348
        index_names = [d['name'] for d in indexes]
 
349
        for e_index in expected_indexes:
 
350
            assert e_index['name'] in index_names
 
351
            index = indexes[index_names.index(e_index['name'])]
 
352
            for key in e_index:
 
353
                eq_(e_index[key], index[key])
 
354
 
 
355
    @testing.requires.index_reflection
 
356
    def test_get_indexes(self):
 
357
        self._test_get_indexes()
 
358
 
 
359
    @testing.requires.index_reflection
 
360
    @testing.requires.schemas
 
361
    def test_get_indexes_with_schema(self):
 
362
        self._test_get_indexes(schema='test_schema')
 
363
 
 
364
    @testing.provide_metadata
 
365
    def _test_get_view_definition(self, schema=None):
 
366
        meta = self.metadata
 
367
        users, addresses, dingalings = self.tables.users, \
 
368
                    self.tables.email_addresses, self.tables.dingalings
 
369
        view_name1 = 'users_v'
 
370
        view_name2 = 'email_addresses_v'
 
371
        insp = inspect(meta.bind)
 
372
        v1 = insp.get_view_definition(view_name1, schema=schema)
 
373
        self.assert_(v1)
 
374
        v2 = insp.get_view_definition(view_name2, schema=schema)
 
375
        self.assert_(v2)
 
376
 
 
377
    @testing.requires.view_reflection
 
378
    def test_get_view_definition(self):
 
379
        self._test_get_view_definition()
 
380
 
 
381
    @testing.requires.view_reflection
 
382
    @testing.requires.schemas
 
383
    def test_get_view_definition_with_schema(self):
 
384
        self._test_get_view_definition(schema='test_schema')
 
385
 
 
386
    @testing.only_on("postgresql", "PG specific feature")
 
387
    @testing.provide_metadata
 
388
    def _test_get_table_oid(self, table_name, schema=None):
 
389
        meta = self.metadata
 
390
        users, addresses, dingalings = self.tables.users, \
 
391
                    self.tables.email_addresses, self.tables.dingalings
 
392
        insp = inspect(meta.bind)
 
393
        oid = insp.get_table_oid(table_name, schema)
 
394
        self.assert_(isinstance(oid, (int, long)))
 
395
 
 
396
    def test_get_table_oid(self):
 
397
        self._test_get_table_oid('users')
 
398
 
 
399
    @testing.requires.schemas
 
400
    def test_get_table_oid_with_schema(self):
 
401
        self._test_get_table_oid('users', schema='test_schema')
 
402
 
 
403
    @testing.provide_metadata
 
404
    def test_autoincrement_col(self):
 
405
        """test that 'autoincrement' is reflected according to sqla's policy.
 
406
 
 
407
        Don't mark this test as unsupported for any backend !
 
408
 
 
409
        (technically it fails with MySQL InnoDB since "id" comes before "id2")
 
410
 
 
411
        A backend is better off not returning "autoincrement" at all,
 
412
        instead of potentially returning "False" for an auto-incrementing
 
413
        primary key column.
 
414
 
 
415
        """
 
416
 
 
417
        meta = self.metadata
 
418
        insp = inspect(meta.bind)
 
419
 
 
420
        for tname, cname in [
 
421
                ('users', 'user_id'),
 
422
                ('email_addresses', 'address_id'),
 
423
                ('dingalings', 'dingaling_id'),
 
424
            ]:
 
425
            cols = insp.get_columns(tname)
 
426
            id_ = dict((c['name'], c) for c in cols)[cname]
 
427
            assert id_.get('autoincrement', True)
 
428
 
 
429
 
 
430
 
 
431
__all__ = ('ComponentReflectionTest', 'HasTableTest')