2
from __future__ import with_statement
3
from sqlalchemy.testing import eq_, engines, pickleable
6
from sqlalchemy import *
7
from sqlalchemy import types, schema
8
from sqlalchemy.databases import mssql
9
from sqlalchemy.dialects.mssql.base import TIME
10
from sqlalchemy.testing import fixtures, \
11
AssertsExecutionResults, ComparesTables
12
from sqlalchemy import testing
13
from sqlalchemy.testing import emits_warning_on
15
from sqlalchemy.util import b
18
class TimeTypeTest(fixtures.TestBase):
20
def test_result_processor_no_microseconds(self):
21
expected = datetime.time(12, 34, 56)
22
self._assert_result_processor(expected, '12:34:56')
24
def test_result_processor_too_many_microseconds(self):
25
# microsecond must be in 0..999999, should truncate (6 vs 7 digits)
26
expected = datetime.time(12, 34, 56, 123456)
27
self._assert_result_processor(expected, '12:34:56.1234567')
29
def _assert_result_processor(self, expected, value):
30
mssql_time_type = TIME()
31
result_processor = mssql_time_type.result_processor(None, None)
32
eq_(expected, result_processor(value))
35
class TypeDDLTest(fixtures.TestBase):
36
def test_boolean(self):
37
"Exercise type specification for boolean type."
40
# column type, args, kwargs, expected ddl
46
table_args = ['test_mssql_boolean', metadata]
47
for index, spec in enumerate(columns):
48
type_, args, kw, res = spec
50
Column('c%s' % index, type_(*args, **kw), nullable=None))
52
boolean_table = Table(*table_args)
53
dialect = mssql.dialect()
54
gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table))
56
for col in boolean_table.c:
57
index = int(col.name[1:])
58
testing.eq_(gen.get_column_specification(col),
59
"%s %s" % (col.name, columns[index][3]))
60
self.assert_(repr(col))
63
def test_numeric(self):
64
"Exercise type specification and options for numeric types."
67
# column type, args, kwargs, expected ddl
68
(types.NUMERIC, [], {},
70
(types.NUMERIC, [None], {},
72
(types.NUMERIC, [12, 4], {},
77
(types.Float, [None], {},
79
(types.Float, [12], {},
81
(mssql.MSReal, [], {},
84
(types.Integer, [], {},
86
(types.BigInteger, [], {},
88
(mssql.MSTinyInteger, [], {},
90
(types.SmallInteger, [], {},
95
table_args = ['test_mssql_numeric', metadata]
96
for index, spec in enumerate(columns):
97
type_, args, kw, res = spec
99
Column('c%s' % index, type_(*args, **kw), nullable=None))
101
numeric_table = Table(*table_args)
102
dialect = mssql.dialect()
103
gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table))
105
for col in numeric_table.c:
106
index = int(col.name[1:])
107
testing.eq_(gen.get_column_specification(col),
108
"%s %s" % (col.name, columns[index][3]))
109
self.assert_(repr(col))
113
"""Exercise COLLATE-ish options on string types."""
116
(mssql.MSChar, [], {},
118
(mssql.MSChar, [1], {},
120
(mssql.MSChar, [1], {'collation': 'Latin1_General_CI_AS'},
121
'CHAR(1) COLLATE Latin1_General_CI_AS'),
123
(mssql.MSNChar, [], {},
125
(mssql.MSNChar, [1], {},
127
(mssql.MSNChar, [1], {'collation': 'Latin1_General_CI_AS'},
128
'NCHAR(1) COLLATE Latin1_General_CI_AS'),
130
(mssql.MSString, [], {},
132
(mssql.MSString, [1], {},
134
(mssql.MSString, [1], {'collation': 'Latin1_General_CI_AS'},
135
'VARCHAR(1) COLLATE Latin1_General_CI_AS'),
137
(mssql.MSNVarchar, [], {},
139
(mssql.MSNVarchar, [1], {},
141
(mssql.MSNVarchar, [1], {'collation': 'Latin1_General_CI_AS'},
142
'NVARCHAR(1) COLLATE Latin1_General_CI_AS'),
144
(mssql.MSText, [], {},
146
(mssql.MSText, [], {'collation': 'Latin1_General_CI_AS'},
147
'TEXT COLLATE Latin1_General_CI_AS'),
149
(mssql.MSNText, [], {},
151
(mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'},
152
'NTEXT COLLATE Latin1_General_CI_AS'),
155
metadata = MetaData()
156
table_args = ['test_mssql_charset', metadata]
157
for index, spec in enumerate(columns):
158
type_, args, kw, res = spec
160
Column('c%s' % index, type_(*args, **kw), nullable=None))
162
charset_table = Table(*table_args)
163
dialect = mssql.dialect()
164
gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table))
166
for col in charset_table.c:
167
index = int(col.name[1:])
168
testing.eq_(gen.get_column_specification(col),
169
"%s %s" % (col.name, columns[index][3]))
170
self.assert_(repr(col))
173
def test_timestamp(self):
174
"""Exercise TIMESTAMP column."""
176
dialect = mssql.dialect()
178
metadata = MetaData()
179
spec, expected = (TIMESTAMP, 'TIMESTAMP')
180
t = Table('mssql_ts', metadata,
181
Column('id', Integer, primary_key=True),
182
Column('t', spec, nullable=None))
183
gen = dialect.ddl_compiler(dialect, schema.CreateTable(t))
184
testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected)
185
self.assert_(repr(t.c.t))
187
def test_money(self):
188
"""Exercise type specification for money types."""
190
columns = [(mssql.MSMoney, [], {}, 'MONEY'),
191
(mssql.MSSmallMoney, [], {}, 'SMALLMONEY')]
192
metadata = MetaData()
193
table_args = ['test_mssql_money', metadata]
194
for index, spec in enumerate(columns):
195
type_, args, kw, res = spec
196
table_args.append(Column('c%s' % index, type_(*args, **kw),
198
money_table = Table(*table_args)
199
dialect = mssql.dialect()
200
gen = dialect.ddl_compiler(dialect,
201
schema.CreateTable(money_table))
202
for col in money_table.c:
203
index = int(col.name[1:])
204
testing.eq_(gen.get_column_specification(col), '%s %s'
205
% (col.name, columns[index][3]))
206
self.assert_(repr(col))
208
def test_binary(self):
209
"Exercise type specification for binary types."
212
# column type, args, kwargs, expected ddl
213
(mssql.MSBinary, [], {},
215
(mssql.MSBinary, [10], {},
218
(types.BINARY, [], {},
220
(types.BINARY, [10], {},
223
(mssql.MSVarBinary, [], {},
225
(mssql.MSVarBinary, [10], {},
228
(types.VARBINARY, [10], {},
230
(types.VARBINARY, [], {},
233
(mssql.MSImage, [], {},
236
(mssql.IMAGE, [], {},
239
(types.LargeBinary, [], {},
243
metadata = MetaData()
244
table_args = ['test_mssql_binary', metadata]
245
for index, spec in enumerate(columns):
246
type_, args, kw, res = spec
247
table_args.append(Column('c%s' % index, type_(*args, **kw),
249
binary_table = Table(*table_args)
250
dialect = mssql.dialect()
251
gen = dialect.ddl_compiler(dialect,
252
schema.CreateTable(binary_table))
253
for col in binary_table.c:
254
index = int(col.name[1:])
255
testing.eq_(gen.get_column_specification(col), '%s %s'
256
% (col.name, columns[index][3]))
257
self.assert_(repr(col))
259
class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTables):
260
__only_on__ = 'mssql'
263
def setup_class(cls):
265
metadata = MetaData(testing.db)
270
@testing.fails_on_everything_except('mssql+pyodbc',
271
'this is some pyodbc-specific feature')
272
def test_decimal_notation(self):
273
numeric_table = Table('numeric_table', metadata, Column('id',
274
Integer, Sequence('numeric_id_seq',
275
optional=True), primary_key=True),
277
Numeric(precision=38, scale=20,
279
metadata.create_all()
280
test_items = [decimal.Decimal(d) for d in (
281
'1500000.00000000000000000000',
282
'-1500000.00000000000000000000',
284
'0.0000000000000000002',
286
'-0.0000000000000000002',
323
'000000000000000000012',
324
'000000000000.32E12',
325
'00000000000000.1E+12',
326
'000000000000.2E-32',
329
for value in test_items:
330
numeric_table.insert().execute(numericcol=value)
332
for value in select([numeric_table.c.numericcol]).execute():
333
assert value[0] in test_items, "%r not in test_items" % value[0]
335
def test_float(self):
336
float_table = Table('float_table', metadata, Column('id',
337
Integer, Sequence('numeric_id_seq',
338
optional=True), primary_key=True),
339
Column('floatcol', Float()))
340
metadata.create_all()
342
test_items = [float(d) for d in (
343
'1500000.00000000000000000000',
344
'-1500000.00000000000000000000',
346
'0.0000000000000000002',
348
'-0.0000000000000000002',
368
for value in test_items:
369
float_table.insert().execute(floatcol=value)
374
# todo this should suppress warnings, but it does not
375
@emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
376
def test_dates(self):
377
"Exercise type specification for date types."
380
# column type, args, kwargs, expected ddl
381
(mssql.MSDateTime, [], {},
385
'DATE', ['>=', (10,)]),
387
'DATE', ['>=', (10,)]),
389
'DATETIME', ['<', (10,)], mssql.MSDateTime),
390
(mssql.MSDate, [], {},
391
'DATE', ['>=', (10,)]),
392
(mssql.MSDate, [], {},
393
'DATETIME', ['<', (10,)], mssql.MSDateTime),
396
'TIME', ['>=', (10,)]),
398
'TIME', ['>=', (10,)]),
399
(mssql.MSTime, [], {},
400
'TIME', ['>=', (10,)]),
401
(mssql.MSTime, [1], {},
402
'TIME(1)', ['>=', (10,)]),
404
'DATETIME', ['<', (10,)], mssql.MSDateTime),
405
(mssql.MSTime, [], {},
406
'TIME', ['>=', (10,)]),
408
(mssql.MSSmallDateTime, [], {},
409
'SMALLDATETIME', []),
411
(mssql.MSDateTimeOffset, [], {},
412
'DATETIMEOFFSET', ['>=', (10,)]),
413
(mssql.MSDateTimeOffset, [1], {},
414
'DATETIMEOFFSET(1)', ['>=', (10,)]),
416
(mssql.MSDateTime2, [], {},
417
'DATETIME2', ['>=', (10,)]),
418
(mssql.MSDateTime2, [1], {},
419
'DATETIME2(1)', ['>=', (10,)]),
423
table_args = ['test_mssql_dates', metadata]
424
for index, spec in enumerate(columns):
425
type_, args, kw, res, requires = spec[0:5]
426
if requires and testing._is_excluded('mssql', *requires) \
428
c = Column('c%s' % index, type_(*args,
429
**kw), nullable=None)
430
testing.db.dialect.type_descriptor(c.type)
432
dates_table = Table(*table_args)
433
gen = testing.db.dialect.ddl_compiler(testing.db.dialect,
434
schema.CreateTable(dates_table))
435
for col in dates_table.c:
436
index = int(col.name[1:])
437
testing.eq_(gen.get_column_specification(col), '%s %s'
438
% (col.name, columns[index][3]))
439
self.assert_(repr(col))
440
dates_table.create(checkfirst=True)
441
reflected_dates = Table('test_mssql_dates',
442
MetaData(testing.db), autoload=True)
443
for col in reflected_dates.c:
444
self.assert_types_base(col, dates_table.c[col.key])
446
def test_date_roundtrip(self):
447
t = Table('test_dates', metadata,
448
Column('id', Integer,
449
Sequence('datetest_id_seq', optional=True),
451
Column('adate', Date),
452
Column('atime', Time),
453
Column('adatetime', DateTime))
454
metadata.create_all()
455
d1 = datetime.date(2007, 10, 30)
456
t1 = datetime.time(11, 2, 32)
457
d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
458
t.insert().execute(adate=d1, adatetime=d2, atime=t1)
459
t.insert().execute(adate=d2, adatetime=d2, atime=d2)
461
x = t.select().execute().fetchall()[0]
462
self.assert_(x.adate.__class__ == datetime.date)
463
self.assert_(x.atime.__class__ == datetime.time)
464
self.assert_(x.adatetime.__class__ == datetime.datetime)
468
t.insert().execute(adate=d1, adatetime=d2, atime=t1)
470
eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate
471
== d1).execute().fetchall(), [(d1, t1, d2)])
473
@emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
474
@testing.provide_metadata
475
def test_binary_reflection(self):
476
"Exercise type specification for binary types."
479
# column type, args, kwargs, expected ddl
480
(mssql.MSBinary, [], {},
482
(mssql.MSBinary, [10], {},
485
(types.BINARY, [], {},
487
(types.BINARY, [10], {},
490
(mssql.MSVarBinary, [], {},
492
(mssql.MSVarBinary, [10], {},
495
(types.VARBINARY, [10], {},
497
(types.VARBINARY, [], {},
500
(mssql.MSImage, [], {},
503
(mssql.IMAGE, [], {},
506
(types.LargeBinary, [], {},
510
metadata = self.metadata
511
table_args = ['test_mssql_binary', metadata]
512
for index, spec in enumerate(columns):
513
type_, args, kw, res = spec
514
table_args.append(Column('c%s' % index, type_(*args, **kw),
516
binary_table = Table(*table_args)
517
metadata.create_all()
518
reflected_binary = Table('test_mssql_binary',
519
MetaData(testing.db), autoload=True)
520
for col in reflected_binary.c:
521
c1 = testing.db.dialect.type_descriptor(col.type).__class__
523
testing.db.dialect.type_descriptor(
524
binary_table.c[col.name].type).__class__
525
assert issubclass(c1, c2), '%r is not a subclass of %r' \
527
if binary_table.c[col.name].type.length:
528
testing.eq_(col.type.length,
529
binary_table.c[col.name].type.length)
532
def test_autoincrement(self):
533
Table('ai_1', metadata,
534
Column('int_y', Integer, primary_key=True),
535
Column('int_n', Integer, DefaultClause('0'),
536
primary_key=True, autoincrement=False))
537
Table('ai_2', metadata,
538
Column('int_y', Integer, primary_key=True),
539
Column('int_n', Integer, DefaultClause('0'),
540
primary_key=True, autoincrement=False))
541
Table('ai_3', metadata,
542
Column('int_n', Integer, DefaultClause('0'),
543
primary_key=True, autoincrement=False),
544
Column('int_y', Integer, primary_key=True))
545
Table('ai_4', metadata,
546
Column('int_n', Integer, DefaultClause('0'),
547
primary_key=True, autoincrement=False),
548
Column('int_n2', Integer, DefaultClause('0'),
549
primary_key=True, autoincrement=False))
550
Table('ai_5', metadata,
551
Column('int_y', Integer, primary_key=True),
552
Column('int_n', Integer, DefaultClause('0'),
553
primary_key=True, autoincrement=False))
554
Table('ai_6', metadata,
555
Column('o1', String(1), DefaultClause('x'),
557
Column('int_y', Integer, primary_key=True))
558
Table('ai_7', metadata,
559
Column('o1', String(1), DefaultClause('x'),
561
Column('o2', String(1), DefaultClause('x'),
563
Column('int_y', Integer, primary_key=True))
564
Table('ai_8', metadata,
565
Column('o1', String(1), DefaultClause('x'),
567
Column('o2', String(1), DefaultClause('x'),
569
metadata.create_all()
571
table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
572
'ai_5', 'ai_6', 'ai_7', 'ai_8']
573
mr = MetaData(testing.db)
575
for name in table_names:
576
tbl = Table(name, mr, autoload=True)
577
tbl = metadata.tables[name]
579
if c.name.startswith('int_y'):
580
assert c.autoincrement, name
581
assert tbl._autoincrement_column is c, name
582
elif c.name.startswith('int_n'):
583
assert not c.autoincrement, name
584
assert tbl._autoincrement_column is not c, name
586
# mxodbc can't handle scope_identity() with DEFAULT VALUES
588
if testing.db.driver == 'mxodbc':
590
[engines.testing_engine(options={'implicit_returning'
594
[engines.testing_engine(options={'implicit_returning'
596
engines.testing_engine(options={'implicit_returning'
599
for counter, engine in enumerate(eng):
600
engine.execute(tbl.insert())
602
assert engine.scalar(select([tbl.c.int_y])) \
604
assert list(engine.execute(tbl.select()).first()).\
605
count(counter + 1) == 1
608
not in list(engine.execute(tbl.select()).first())
609
engine.execute(tbl.delete())
611
class MonkeyPatchedBinaryTest(fixtures.TestBase):
612
__only_on__ = 'mssql+pymssql'
614
def test_unicode(self):
615
module = __import__('pymssql')
616
result = module.Binary('foo')
619
def test_bytes(self):
620
module = __import__('pymssql')
621
input = b('\x80\x03]q\x00X\x03\x00\x00\x00oneq\x01a.')
622
expected_result = input
623
result = module.Binary(input)
624
eq_(result, expected_result)
626
class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
627
"""Test the Binary and VarBinary types"""
629
__only_on__ = 'mssql'
632
def setup_class(cls):
633
global binary_table, MyPickleType
635
class MyPickleType(types.TypeDecorator):
638
def process_bind_param(self, value, dialect):
640
value.stuff = 'this is modified stuff'
643
def process_result_value(self, value, dialect):
645
value.stuff = 'this is the right stuff'
648
binary_table = Table(
650
MetaData(testing.db),
651
Column('primary_id', Integer, Sequence('binary_id_seq',
652
optional=True), primary_key=True),
653
Column('data', mssql.MSVarBinary(8000)),
654
Column('data_image', mssql.MSImage),
655
Column('data_slice', types.BINARY(100)),
656
Column('misc', String(30)),
657
Column('pickled', PickleType),
658
Column('mypickle', MyPickleType),
660
binary_table.create()
663
binary_table.delete().execute()
666
def teardown_class(cls):
669
def test_binary(self):
670
testobj1 = pickleable.Foo('im foo 1')
671
testobj2 = pickleable.Foo('im foo 2')
672
testobj3 = pickleable.Foo('im foo 3')
673
stream1 = self.load_stream('binary_data_one.dat')
674
stream2 = self.load_stream('binary_data_two.dat')
675
binary_table.insert().execute(
677
misc='binary_data_one.dat',
680
data_slice=stream1[0:100],
684
binary_table.insert().execute(
686
misc='binary_data_two.dat',
689
data_slice=stream2[0:99],
693
# TODO: pyodbc does not seem to accept "None" for a VARBINARY
694
# column (data=None). error: [Microsoft][ODBC SQL Server
695
# Driver][SQL Server]Implicit conversion from data type varchar
696
# to varbinary is not allowed. Use the CONVERT function to run
697
# this query. (257) binary_table.insert().execute(primary_id=3,
698
# misc='binary_data_two.dat', data=None, data_image=None,
699
# data_slice=stream2[0:99], pickled=None)
701
binary_table.insert().execute(primary_id=3,
702
misc='binary_data_two.dat', data_image=None,
703
data_slice=stream2[0:99], pickled=None)
705
binary_table.select(order_by=binary_table.c.primary_id), \
706
text('select * from binary_table order by '
707
'binary_table.primary_id',
708
typemap=dict(data=mssql.MSVarBinary(8000),
709
data_image=mssql.MSImage,
710
data_slice=types.BINARY(100), pickled=PickleType,
711
mypickle=MyPickleType), bind=testing.db):
712
l = stmt.execute().fetchall()
713
eq_(list(stream1), list(l[0]['data']))
714
paddedstream = list(stream1[0:100])
715
paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
716
eq_(paddedstream, list(l[0]['data_slice']))
717
eq_(list(stream2), list(l[1]['data']))
718
eq_(list(stream2), list(l[1]['data_image']))
719
eq_(testobj1, l[0]['pickled'])
720
eq_(testobj2, l[1]['pickled'])
721
eq_(testobj3.moredata, l[0]['mypickle'].moredata)
722
eq_(l[0]['mypickle'].stuff, 'this is the right stuff')
724
def load_stream(self, name, len=3000):
725
fp = open(os.path.join(os.path.dirname(__file__), "..", "..", name), 'rb')
726
stream = fp.read(len)