1
from __future__ import with_statement
3
import sqlalchemy as sa
4
from sqlalchemy import exc as sa_exc
5
from sqlalchemy import types as sql_types
6
from sqlalchemy import inspect
7
from sqlalchemy import MetaData, Integer, String
8
from sqlalchemy.engine.reflection import Inspector
9
from sqlalchemy.testing import engines, fixtures
10
from sqlalchemy.testing.schema import Table, Column
11
from sqlalchemy.testing import eq_, assert_raises_message
12
from sqlalchemy import testing
15
from sqlalchemy.schema import DDL, Index
16
from sqlalchemy import event
18
metadata, users = None, None
21
class HasTableTest(fixtures.TablesTest):
23
def define_tables(cls, metadata):
24
Table('test_table', metadata,
25
Column('id', Integer, primary_key=True),
26
Column('data', String(50))
29
def test_has_table(self):
30
with config.db.begin() as conn:
31
assert config.db.dialect.has_table(conn, "test_table")
32
assert not config.db.dialect.has_table(conn, "nonexistent_table")
36
class ComponentReflectionTest(fixtures.TablesTest):
37
run_inserts = run_deletes = None
40
def define_tables(cls, metadata):
41
cls.define_reflected_tables(metadata, None)
42
if testing.requires.schemas.enabled:
43
cls.define_reflected_tables(metadata, "test_schema")
46
def define_reflected_tables(cls, metadata, schema):
48
schema_prefix = schema + "."
52
if testing.requires.self_referential_foreign_keys.enabled:
53
users = Table('users', metadata,
54
Column('user_id', sa.INT, primary_key=True),
55
Column('test1', sa.CHAR(5), nullable=False),
56
Column('test2', sa.Float(5), nullable=False),
57
Column('parent_user_id', sa.Integer,
58
sa.ForeignKey('%susers.user_id' % schema_prefix)),
63
users = Table('users', metadata,
64
Column('user_id', sa.INT, primary_key=True),
65
Column('test1', sa.CHAR(5), nullable=False),
66
Column('test2', sa.Float(5), nullable=False),
71
Table("dingalings", metadata,
72
Column('dingaling_id', sa.Integer, primary_key=True),
73
Column('address_id', sa.Integer,
74
sa.ForeignKey('%semail_addresses.address_id' %
76
Column('data', sa.String(30)),
80
Table('email_addresses', metadata,
81
Column('address_id', sa.Integer),
82
Column('remote_user_id', sa.Integer,
83
sa.ForeignKey(users.c.user_id)),
84
Column('email_address', sa.String(20)),
85
sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
90
if testing.requires.index_reflection.enabled:
91
cls.define_index(metadata, users)
92
if testing.requires.view_reflection.enabled:
93
cls.define_views(metadata, schema)
96
def define_index(cls, metadata, users):
97
Index("users_t_idx", users.c.test1, users.c.test2)
98
Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
101
def define_views(cls, metadata, schema):
102
for table_name in ('users', 'email_addresses'):
103
fullname = table_name
105
fullname = "%s.%s" % (schema, table_name)
106
view_name = fullname + '_v'
107
query = "CREATE VIEW %s AS SELECT * FROM %s" % (
118
DDL("DROP VIEW %s" % view_name)
121
@testing.requires.schema_reflection
122
def test_get_schema_names(self):
123
insp = inspect(testing.db)
125
self.assert_('test_schema' in insp.get_schema_names())
127
@testing.requires.schema_reflection
128
def test_dialect_initialize(self):
129
engine = engines.testing_engine()
130
assert not hasattr(engine.dialect, 'default_schema_name')
132
assert hasattr(engine.dialect, 'default_schema_name')
134
@testing.requires.schema_reflection
135
def test_get_default_schema_name(self):
136
insp = inspect(testing.db)
137
eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
139
@testing.provide_metadata
140
def _test_get_table_names(self, schema=None, table_type='table',
143
users, addresses, dingalings = self.tables.users, \
144
self.tables.email_addresses, self.tables.dingalings
145
insp = inspect(meta.bind)
146
if table_type == 'view':
147
table_names = insp.get_view_names(schema)
149
answer = ['email_addresses_v', 'users_v']
151
table_names = insp.get_table_names(schema,
153
if order_by == 'foreign_key':
154
answer = ['users', 'email_addresses', 'dingalings']
155
eq_(table_names, answer)
157
answer = ['dingalings', 'email_addresses', 'users']
158
eq_(sorted(table_names), answer)
160
@testing.requires.table_reflection
161
def test_get_table_names(self):
162
self._test_get_table_names()
164
@testing.requires.table_reflection
165
@testing.requires.foreign_key_constraint_reflection
166
def test_get_table_names_fks(self):
167
self._test_get_table_names(order_by='foreign_key')
169
@testing.requires.table_reflection
170
@testing.requires.schemas
171
def test_get_table_names_with_schema(self):
172
self._test_get_table_names('test_schema')
174
@testing.requires.view_reflection
175
def test_get_view_names(self):
176
self._test_get_table_names(table_type='view')
178
@testing.requires.view_reflection
179
@testing.requires.schemas
180
def test_get_view_names_with_schema(self):
181
self._test_get_table_names('test_schema', table_type='view')
183
def _test_get_columns(self, schema=None, table_type='table'):
184
meta = MetaData(testing.db)
185
users, addresses, dingalings = self.tables.users, \
186
self.tables.email_addresses, self.tables.dingalings
187
table_names = ['users', 'email_addresses']
188
if table_type == 'view':
189
table_names = ['users_v', 'email_addresses_v']
190
insp = inspect(meta.bind)
191
for table_name, table in zip(table_names, (users,
194
cols = insp.get_columns(table_name, schema=schema_name)
195
self.assert_(len(cols) > 0, len(cols))
199
for i, col in enumerate(table.columns):
200
eq_(col.name, cols[i]['name'])
201
ctype = cols[i]['type'].__class__
203
if isinstance(ctype_def, sa.types.TypeEngine):
204
ctype_def = ctype_def.__class__
206
# Oracle returns Date for DateTime.
208
if testing.against('oracle') and ctype_def \
209
in (sql_types.Date, sql_types.DateTime):
210
ctype_def = sql_types.Date
212
# assert that the desired type and return type share
213
# a base within one of the generic types.
215
self.assert_(len(set(ctype.__mro__).
216
intersection(ctype_def.__mro__).intersection([
224
])) > 0, '%s(%s), %s(%s)' % (col.name,
225
col.type, cols[i]['name'], ctype))
227
if not col.primary_key:
228
assert cols[i]['default'] is None
230
@testing.requires.table_reflection
231
def test_get_columns(self):
232
self._test_get_columns()
234
@testing.requires.table_reflection
235
@testing.requires.schemas
236
def test_get_columns_with_schema(self):
237
self._test_get_columns(schema='test_schema')
239
@testing.requires.view_reflection
240
def test_get_view_columns(self):
241
self._test_get_columns(table_type='view')
243
@testing.requires.view_reflection
244
@testing.requires.schemas
245
def test_get_view_columns_with_schema(self):
246
self._test_get_columns(schema='test_schema', table_type='view')
248
@testing.provide_metadata
249
def _test_get_pk_constraint(self, schema=None):
251
users, addresses = self.tables.users, self.tables.email_addresses
252
insp = inspect(meta.bind)
254
users_cons = insp.get_pk_constraint(users.name, schema=schema)
255
users_pkeys = users_cons['constrained_columns']
256
eq_(users_pkeys, ['user_id'])
258
addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
259
addr_pkeys = addr_cons['constrained_columns']
260
eq_(addr_pkeys, ['address_id'])
262
with testing.requires.reflects_pk_names.fail_if():
263
eq_(addr_cons['name'], 'email_ad_pk')
265
@testing.requires.primary_key_constraint_reflection
266
def test_get_pk_constraint(self):
267
self._test_get_pk_constraint()
269
@testing.requires.table_reflection
270
@testing.requires.primary_key_constraint_reflection
271
@testing.requires.schemas
272
def test_get_pk_constraint_with_schema(self):
273
self._test_get_pk_constraint(schema='test_schema')
275
@testing.requires.table_reflection
276
@testing.provide_metadata
277
def test_deprecated_get_primary_keys(self):
279
users = self.tables.users
280
insp = Inspector(meta.bind)
281
assert_raises_message(
282
sa_exc.SADeprecationWarning,
283
"Call to deprecated method get_primary_keys."
284
" Use get_pk_constraint instead.",
285
insp.get_primary_keys, users.name
288
@testing.provide_metadata
289
def _test_get_foreign_keys(self, schema=None):
291
users, addresses, dingalings = self.tables.users, \
292
self.tables.email_addresses, self.tables.dingalings
293
insp = inspect(meta.bind)
294
expected_schema = schema
296
users_fkeys = insp.get_foreign_keys(users.name,
298
fkey1 = users_fkeys[0]
300
with testing.requires.named_constraints.fail_if():
301
self.assert_(fkey1['name'] is not None)
303
eq_(fkey1['referred_schema'], expected_schema)
304
eq_(fkey1['referred_table'], users.name)
305
eq_(fkey1['referred_columns'], ['user_id', ])
306
if testing.requires.self_referential_foreign_keys.enabled:
307
eq_(fkey1['constrained_columns'], ['parent_user_id'])
310
addr_fkeys = insp.get_foreign_keys(addresses.name,
312
fkey1 = addr_fkeys[0]
314
with testing.requires.named_constraints.fail_if():
315
self.assert_(fkey1['name'] is not None)
317
eq_(fkey1['referred_schema'], expected_schema)
318
eq_(fkey1['referred_table'], users.name)
319
eq_(fkey1['referred_columns'], ['user_id', ])
320
eq_(fkey1['constrained_columns'], ['remote_user_id'])
322
@testing.requires.foreign_key_constraint_reflection
323
def test_get_foreign_keys(self):
324
self._test_get_foreign_keys()
326
@testing.requires.foreign_key_constraint_reflection
327
@testing.requires.schemas
328
def test_get_foreign_keys_with_schema(self):
329
self._test_get_foreign_keys(schema='test_schema')
331
@testing.provide_metadata
332
def _test_get_indexes(self, schema=None):
334
users, addresses, dingalings = self.tables.users, \
335
self.tables.email_addresses, self.tables.dingalings
336
# The database may decide to create indexes for foreign keys, etc.
337
# so there may be more indexes than expected.
338
insp = inspect(meta.bind)
339
indexes = insp.get_indexes('users', schema=schema)
342
'column_names': ['test1', 'test2'],
343
'name': 'users_t_idx'},
345
'column_names': ['user_id', 'test2', 'test1'],
346
'name': 'users_all_idx'}
348
index_names = [d['name'] for d in indexes]
349
for e_index in expected_indexes:
350
assert e_index['name'] in index_names
351
index = indexes[index_names.index(e_index['name'])]
353
eq_(e_index[key], index[key])
355
@testing.requires.index_reflection
356
def test_get_indexes(self):
357
self._test_get_indexes()
359
@testing.requires.index_reflection
360
@testing.requires.schemas
361
def test_get_indexes_with_schema(self):
362
self._test_get_indexes(schema='test_schema')
364
@testing.provide_metadata
365
def _test_get_view_definition(self, schema=None):
367
users, addresses, dingalings = self.tables.users, \
368
self.tables.email_addresses, self.tables.dingalings
369
view_name1 = 'users_v'
370
view_name2 = 'email_addresses_v'
371
insp = inspect(meta.bind)
372
v1 = insp.get_view_definition(view_name1, schema=schema)
374
v2 = insp.get_view_definition(view_name2, schema=schema)
377
@testing.requires.view_reflection
378
def test_get_view_definition(self):
379
self._test_get_view_definition()
381
@testing.requires.view_reflection
382
@testing.requires.schemas
383
def test_get_view_definition_with_schema(self):
384
self._test_get_view_definition(schema='test_schema')
386
@testing.only_on("postgresql", "PG specific feature")
387
@testing.provide_metadata
388
def _test_get_table_oid(self, table_name, schema=None):
390
users, addresses, dingalings = self.tables.users, \
391
self.tables.email_addresses, self.tables.dingalings
392
insp = inspect(meta.bind)
393
oid = insp.get_table_oid(table_name, schema)
394
self.assert_(isinstance(oid, (int, long)))
396
def test_get_table_oid(self):
397
self._test_get_table_oid('users')
399
@testing.requires.schemas
400
def test_get_table_oid_with_schema(self):
401
self._test_get_table_oid('users', schema='test_schema')
403
@testing.provide_metadata
404
def test_autoincrement_col(self):
405
"""test that 'autoincrement' is reflected according to sqla's policy.
407
Don't mark this test as unsupported for any backend !
409
(technically it fails with MySQL InnoDB since "id" comes before "id2")
411
A backend is better off not returning "autoincrement" at all,
412
instead of potentially returning "False" for an auto-incrementing
418
insp = inspect(meta.bind)
420
for tname, cname in [
421
('users', 'user_id'),
422
('email_addresses', 'address_id'),
423
('dingalings', 'dingaling_id'),
425
cols = insp.get_columns(tname)
426
id_ = dict((c['name'], c) for c in cols)[cname]
427
assert id_.get('autoincrement', True)
431
__all__ = ('ComponentReflectionTest', 'HasTableTest')