2
from sqlalchemy import *
3
from test import fixture
4
from migrate import changeset
5
from migrate.changeset import *
6
from migrate.changeset.schema import _ColumnDelta
7
from sqlalchemy.databases import information_schema
11
class TestAddDropColumn(fixture.DB):
12
level=fixture.DB.CONNECT
14
# We'll be adding the 'data' column
15
table_name = 'tmp_adddropcol'
18
def _setup(self, url):
19
super(TestAddDropColumn, self)._setup(url)
21
self.table = Table(self.table_name,self.meta,
22
Column('id',Integer,primary_key=True),
24
self.meta.bind = self.engine
25
if self.engine.has_table(self.table.name):
30
if self.engine.has_table(self.table.name):
36
super(TestAddDropColumn,self)._teardown()
38
def run_(self,create_column_func,drop_column_func,*col_p,**col_k):
41
def _assert_numcols(expected,type_):
42
result = len(self.table.c)
44
self.assertEquals(result,expected,
45
"# %s cols incorrect: %s != %s"%(type_,result,expected))
46
if not col_k.get('primary_key',None):
48
# new primary key: check its length too
49
result = len(self.table.primary_key)
50
self.assertEquals(result,expected,
51
"# %s pks incorrect: %s != %s"%(type_,result,expected))
52
def assert_numcols(expected):
53
# number of cols should be correct in table object and in database
54
# Changed: create/drop shouldn't mess with the objects
55
#_assert_numcols(expected,'object')
56
# Detect # database cols via autoload
58
del self.meta.tables[self.table_name]
59
self.table=Table(self.table_name,self.meta,autoload=True)
60
_assert_numcols(expected,'database')
65
col = Column(col_name,*col_p,**col_k)
66
create_column_func(col)
67
#create_column(col,self.table)
69
self.assertEquals(getattr(self.table.c,col_name),col)
70
#drop_column(col,self.table)
71
col = getattr(self.table.c,col_name)
76
def test_undefined(self):
77
"""Add/drop columns not yet defined in the table"""
79
return create_column(col,self.table)
81
return drop_column(col,self.table)
82
return self.run_(add_func,drop_func)
85
def test_defined(self):
86
"""Add/drop columns already defined in the table"""
89
self.table = Table(self.table_name,self.meta,
90
Column('id',Integer,primary_key=True),
93
return create_column(col,self.table)
95
return drop_column(col,self.table)
96
return self.run_(add_func,drop_func)
99
def test_method_bound(self):
100
"""Add/drop columns via column methods; columns bound to a table
101
ie. no table parameter passed to function
104
self.assert_(col.table is None,col.table)
105
self.table.append_column(col)
108
#self.assert_(col.table is None,col.table)
109
#self.table.append_column(col)
111
return self.run_(add_func,drop_func)
114
def test_method_notbound(self):
115
"""Add/drop columns via column methods; columns not bound to a table"""
117
return col.create(self.table)
119
return col.drop(self.table)
120
return self.run_(add_func,drop_func)
123
def test_tablemethod_obj(self):
124
"""Add/drop columns via table methods; by column object"""
126
return self.table.create_column(col)
128
return self.table.drop_column(col)
129
return self.run_(add_func,drop_func)
132
def test_tablemethod_name(self):
133
"""Add/drop columns via table methods; by column name"""
135
# must be bound to table
136
self.table.append_column(col)
137
return self.table.create_column(col.name)
139
# Not necessarily bound to table
140
return self.table.drop_column(col.name)
141
return self.run_(add_func,drop_func)
144
def test_byname(self):
145
"""Add/drop columns via functions; by table object and column name"""
147
self.table.append_column(col)
148
return create_column(col.name,self.table)
150
return drop_column(col.name,self.table)
151
return self.run_(add_func,drop_func)
155
"""Can create columns with foreign keys"""
156
reftable = Table('tmp_ref',self.meta,
157
Column('id',Integer,primary_key=True),
160
if self.engine.has_table(reftable.name):
164
self.table.append_column(col)
165
return create_column(col.name,self.table)
167
ret = drop_column(col.name,self.table)
168
if self.engine.has_table(reftable.name):
171
if self.url.startswith('sqlite'):
172
self.assertRaises(changeset.exceptions.NotSupportedError,
173
self.run_, add_func, drop_func, Integer,
174
ForeignKey(reftable.c.id))
176
return self.run_(add_func,drop_func,Integer,
177
ForeignKey(reftable.c.id))
180
class TestRename(fixture.DB):
181
level=fixture.DB.CONNECT
184
def _setup(self, url):
185
super(TestRename, self)._setup(url)
186
self.meta.bind = self.engine #self.meta.connect(self.engine)
189
def test_rename_table(self):
190
"""Tables can be renamed"""
191
#self.engine.echo=True
196
self.column = Column(name1,Integer)
198
self.table = Table(name1,self.meta,self.column)
199
self.index = Index(xname1,self.column,unique=False)
200
if self.engine.has_table(self.table.name):
202
if self.engine.has_table(name2):
203
tmp = Table(name2,self.meta,autoload=True)
209
def assert_table_name(expected,skip_object_check=False):
210
"""Refresh a table via autoload
211
SA has changed some since this test was written; we now need to do
212
meta.clear() upon reloading a table - clear all rather than a
213
select few. So, this works only if we're working with one table at
214
a time (else, others will vanish too).
216
if not skip_object_check:
218
self.assertEquals(self.table.name,expected)
219
newname = self.table.name
221
# we know the object's name isn't consistent: just assign it
224
#table = self.refresh_table(self.table,newname)
226
self.table = Table(newname, self.meta, autoload=True)
227
self.assertEquals(self.table.name,expected)
228
def assert_index_name(expected,skip_object_check=False):
229
if not skip_object_check:
231
self.assertEquals(self.index.name,expected)
233
# object is inconsistent
234
self.index.name = expected
240
assert_table_name(name1)
241
rename_table(self.table,name2)
242
assert_table_name(name2)
243
self.table.rename(name1)
244
assert_table_name(name1)
245
# ..by just the string
246
rename_table(name1,name2,engine=self.engine)
247
assert_table_name(name2,True) # object not updated
250
if self.url.startswith('sqlite') or self.url.startswith('mysql'):
251
self.assertRaises(changeset.exceptions.NotSupportedError,
252
self.index.rename,xname2)
254
assert_index_name(xname1)
255
rename_index(self.index,xname2,engine=self.engine)
256
assert_index_name(xname2)
257
self.index.rename(xname1)
258
assert_index_name(xname1)
259
# ..by just the string
260
rename_index(xname1,xname2,engine=self.engine)
261
assert_index_name(xname2,True)
265
if self.table.exists():
268
class TestColumnChange(fixture.DB):
269
level=fixture.DB.CONNECT
270
table_name = 'tmp_colchange'
272
def _setup(self, url):
273
super(TestColumnChange, self)._setup(url)
274
self.meta = MetaData(self.engine)
275
self.table = Table(self.table_name,self.meta,
276
Column('id',Integer,primary_key=True),
277
Column('data',String(40),server_default=DefaultClause("tluafed"),nullable=True),
279
if self.table.exists():
283
except sqlalchemy.exceptions.SQLError,e:
284
# SQLite: database schema has changed
285
if not self.url.startswith('sqlite://'):
288
if self.table.exists():
290
self.table.drop(self.engine)
291
except sqlalchemy.exceptions.SQLError,e:
292
# SQLite: database schema has changed
293
if not self.url.startswith('sqlite://'):
295
#self.engine.echo=False
296
super(TestColumnChange, self)._teardown()
299
def test_rename(self):
300
"""Can rename a column"""
301
def num_rows(col,content):
302
return len(list(self.table.select(col==content).execute()))
303
# Table content should be preserved in changed columns
305
self.engine.execute(self.table.insert(),data=content,id=42)
306
self.assertEquals(num_rows(self.table.c.data,content),1)
308
# ...as a function, given a column object and the new name
309
alter_column(self.table.c.data, name='atad')
310
self.refresh_table(self.table.name)
311
self.assert_('data' not in self.table.c.keys())
312
self.assert_('atad' in self.table.c.keys())
313
#self.assertRaises(AttributeError,getattr,self.table.c,'data')
314
self.table.c.atad # Should not raise exception
315
self.assertEquals(num_rows(self.table.c.atad,content),1)
317
# ...as a method, given a new name
318
self.table.c.atad.alter(name='data')
319
self.refresh_table(self.table.name)
320
self.assert_('atad' not in self.table.c.keys())
321
self.table.c.data # Should not raise exception
322
self.assertEquals(num_rows(self.table.c.data,content),1)
324
# ...as a function, given a new object
325
col = Column('atad',String(40),server_default=self.table.c.data.server_default)
326
alter_column(self.table.c.data, col)
327
self.refresh_table(self.table.name)
328
self.assert_('data' not in self.table.c.keys())
329
self.table.c.atad # Should not raise exception
330
self.assertEquals(num_rows(self.table.c.atad,content),1)
332
# ...as a method, given a new object
333
col = Column('data',String(40),server_default=self.table.c.atad.server_default)
334
self.table.c.atad.alter(col)
335
self.refresh_table(self.table.name)
336
self.assert_('atad' not in self.table.c.keys())
337
self.table.c.data # Should not raise exception
338
self.assertEquals(num_rows(self.table.c.data,content),1)
342
"""Can add/drop foreign key constraints to/from a column
345
self.assert_(self.table.c.data.foreign_key is None)
348
self.table.c.data.alter(foreign_key=ForeignKey(self.table.c.id))
349
self.refresh_table(self.table.name)
350
self.assert_(self.table.c.data.foreign_key is not None)
353
self.table.c.data.alter(foreign_key=None)
354
self.refresh_table(self.table.name)
355
self.assert_(self.table.c.data.foreign_key is None)
359
"""Can change a column's type"""
360
# Entire column definition given
361
self.table.c.data.alter(Column('data',String(42)))
362
self.refresh_table(self.table.name)
363
self.assert_(isinstance(self.table.c.data.type,String))
364
self.assertEquals(self.table.c.data.type.length,42)
367
self.table.c.data.alter(type=String(21))
368
self.refresh_table(self.table.name)
369
self.assert_(isinstance(self.table.c.data.type,String))
370
self.assertEquals(self.table.c.data.type.length,21)
373
self.assert_(isinstance(self.table.c.id.type,Integer))
374
self.assertEquals(self.table.c.id.nullable,False)
375
self.table.c.id.alter(type=String(20))
376
self.assertEquals(self.table.c.id.nullable,False)
377
self.refresh_table(self.table.name)
378
self.assert_(isinstance(self.table.c.id.type,String))
380
@fixture.usedb(not_supported='mysql')
381
def test_default(self):
382
"""Can change a column's server_default value (DefaultClauses only)
383
Only DefaultClauses are changed here: others are managed by the
386
#self.engine.echo=True
387
self.assertEquals(self.table.c.data.server_default.arg,'tluafed')
389
# Just the new default
390
default = 'my_default'
391
self.table.c.data.alter(server_default=DefaultClause(default))
392
self.refresh_table(self.table.name)
393
#self.assertEquals(self.table.c.data.server_default.arg,default)
394
# TextClause returned by autoload
395
self.assert_(default in str(self.table.c.data.server_default.arg))
398
default = 'your_default'
399
self.table.c.data.alter(Column('data',String(40),server_default=DefaultClause(default)))
400
self.refresh_table(self.table.name)
401
self.assert_(default in str(self.table.c.data.server_default.arg))
404
self.table.c.data.alter(server_default=None)
405
self.refresh_table(self.table.name)
406
# server_default isn't necessarily None for Oracle
407
#self.assert_(self.table.c.data.server_default is None,self.table.c.data.server_default)
408
self.engine.execute(self.table.insert(),id=11)
409
row = self.table.select().execute().fetchone()
410
self.assert_(row['data'] is None,row['data'])
415
"""Can change a column's null constraint"""
416
self.assertEquals(self.table.c.data.nullable,True)
419
self.table.c.data.alter(Column('data',String(40),nullable=False))
420
self.table.nullable=None
421
self.refresh_table(self.table.name)
422
self.assertEquals(self.table.c.data.nullable,False)
424
# Just the new status
425
self.table.c.data.alter(nullable=True)
426
self.refresh_table(self.table.name)
427
self.assertEquals(self.table.c.data.nullable,True)
431
"""Can add/drop a column to/from its table's primary key
434
self.engine.echo = True
435
self.assertEquals(len(self.table.primary_key),1)
437
# Entire column definition
438
self.table.c.data.alter(Column('data',String,primary_key=True))
439
self.refresh_table(self.table.name)
440
self.assertEquals(len(self.table.primary_key),2)
442
# Just the new status
443
self.table.c.data.alter(primary_key=False)
444
self.refresh_table(self.table.name)
445
self.assertEquals(len(self.table.primary_key),1)
447
class TestColumnDelta(fixture.Base):
448
def test_deltas(self):
449
def mkcol(name='id',type=String,*p,**k):
450
return Column(name,type,*p,**k)
451
col_orig = mkcol(primary_key=True)
453
def verify(expected,original,*p,**k):
454
delta = _ColumnDelta(original,*p,**k)
455
result = delta.keys()
457
self.assertEquals(expected,result)
461
verify(['name'],col_orig,'ids')
462
# Parameters are always executed, even if they're 'unchanged'
463
# (We can't assume given column is up-to-date)
464
verify(['name','primary_key','type'],col_orig,'id',Integer,primary_key=True)
465
verify(['name','primary_key','type'],col_orig,name='id',type=Integer,primary_key=True)
467
# Can compare two columns and find differences
468
col_new = mkcol(name='ids',primary_key=True)
469
verify([],col_orig,col_orig)
470
verify(['name'],col_orig,col_orig,'ids')
471
verify(['name'],col_orig,col_orig,name='ids')
472
verify(['name'],col_orig,col_new)
473
verify(['name','type'],col_orig,col_new,type=String)
474
# Change name, given an up-to-date definition and the current name
475
delta = verify(['name'],col_new,current_name='id')
476
self.assertEquals(delta.get('name'),'ids')
477
# Change other params at the same time
478
verify(['name','type'],col_new,current_name='id',type=String)
480
verify([],mkcol(type=String),mkcol(type=String))
481
verify(['type'],mkcol(type=String),mkcol(type=Integer))
482
verify(['type'],mkcol(type=String),mkcol(type=String(42)))
483
verify([],mkcol(type=String(42)),mkcol(type=String(42)))
484
verify(['type'],mkcol(type=String(24)),mkcol(type=String(42)))
486
verify(['primary_key'],mkcol(nullable=False),mkcol(primary_key=True))
487
# PK implies nullable=False
488
verify(['nullable','primary_key'],mkcol(nullable=True),mkcol(primary_key=True))
489
verify([],mkcol(primary_key=True),mkcol(primary_key=True))
490
verify(['nullable'],mkcol(nullable=True),mkcol(nullable=False))
491
verify([],mkcol(nullable=True),mkcol(nullable=True))
492
verify(['default'],mkcol(default=None),mkcol(default='42'))
493
verify([],mkcol(default=None),mkcol(default=None))
494
verify([],mkcol(default='42'),mkcol(default='42'))