~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/dialect/mssql/test_types.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- encoding: utf-8
 
2
from __future__ import with_statement
 
3
from sqlalchemy.testing import eq_, engines, pickleable
 
4
import datetime
 
5
import os
 
6
from sqlalchemy import *
 
7
from sqlalchemy import types, schema
 
8
from sqlalchemy.databases import mssql
 
9
from sqlalchemy.dialects.mssql.base import TIME
 
10
from sqlalchemy.testing import fixtures, \
 
11
        AssertsExecutionResults, ComparesTables
 
12
from sqlalchemy import testing
 
13
from sqlalchemy.testing import emits_warning_on
 
14
import decimal
 
15
from sqlalchemy.util import b
 
16
 
 
17
 
 
18
class TimeTypeTest(fixtures.TestBase):
 
19
 
 
20
    def test_result_processor_no_microseconds(self):
 
21
        expected = datetime.time(12, 34, 56)
 
22
        self._assert_result_processor(expected, '12:34:56')
 
23
 
 
24
    def test_result_processor_too_many_microseconds(self):
 
25
        # microsecond must be in 0..999999, should truncate (6 vs 7 digits)
 
26
        expected = datetime.time(12, 34, 56, 123456)
 
27
        self._assert_result_processor(expected, '12:34:56.1234567')
 
28
 
 
29
    def _assert_result_processor(self, expected, value):
 
30
        mssql_time_type = TIME()
 
31
        result_processor = mssql_time_type.result_processor(None, None)
 
32
        eq_(expected, result_processor(value))
 
33
 
 
34
 
 
35
class TypeDDLTest(fixtures.TestBase):
 
36
    def test_boolean(self):
 
37
        "Exercise type specification for boolean type."
 
38
 
 
39
        columns = [
 
40
            # column type, args, kwargs, expected ddl
 
41
            (Boolean, [], {},
 
42
             'BIT'),
 
43
           ]
 
44
 
 
45
        metadata = MetaData()
 
46
        table_args = ['test_mssql_boolean', metadata]
 
47
        for index, spec in enumerate(columns):
 
48
            type_, args, kw, res = spec
 
49
            table_args.append(
 
50
                Column('c%s' % index, type_(*args, **kw), nullable=None))
 
51
 
 
52
        boolean_table = Table(*table_args)
 
53
        dialect = mssql.dialect()
 
54
        gen = dialect.ddl_compiler(dialect, schema.CreateTable(boolean_table))
 
55
 
 
56
        for col in boolean_table.c:
 
57
            index = int(col.name[1:])
 
58
            testing.eq_(gen.get_column_specification(col),
 
59
                           "%s %s" % (col.name, columns[index][3]))
 
60
            self.assert_(repr(col))
 
61
 
 
62
 
 
63
    def test_numeric(self):
 
64
        "Exercise type specification and options for numeric types."
 
65
 
 
66
        columns = [
 
67
            # column type, args, kwargs, expected ddl
 
68
            (types.NUMERIC, [], {},
 
69
             'NUMERIC'),
 
70
            (types.NUMERIC, [None], {},
 
71
             'NUMERIC'),
 
72
            (types.NUMERIC, [12, 4], {},
 
73
             'NUMERIC(12, 4)'),
 
74
 
 
75
            (types.Float, [], {},
 
76
             'FLOAT'),
 
77
            (types.Float, [None], {},
 
78
             'FLOAT'),
 
79
            (types.Float, [12], {},
 
80
             'FLOAT(12)'),
 
81
            (mssql.MSReal, [], {},
 
82
             'REAL'),
 
83
 
 
84
            (types.Integer, [], {},
 
85
             'INTEGER'),
 
86
            (types.BigInteger, [], {},
 
87
             'BIGINT'),
 
88
            (mssql.MSTinyInteger, [], {},
 
89
             'TINYINT'),
 
90
            (types.SmallInteger, [], {},
 
91
             'SMALLINT'),
 
92
           ]
 
93
 
 
94
        metadata = MetaData()
 
95
        table_args = ['test_mssql_numeric', metadata]
 
96
        for index, spec in enumerate(columns):
 
97
            type_, args, kw, res = spec
 
98
            table_args.append(
 
99
                Column('c%s' % index, type_(*args, **kw), nullable=None))
 
100
 
 
101
        numeric_table = Table(*table_args)
 
102
        dialect = mssql.dialect()
 
103
        gen = dialect.ddl_compiler(dialect, schema.CreateTable(numeric_table))
 
104
 
 
105
        for col in numeric_table.c:
 
106
            index = int(col.name[1:])
 
107
            testing.eq_(gen.get_column_specification(col),
 
108
                           "%s %s" % (col.name, columns[index][3]))
 
109
            self.assert_(repr(col))
 
110
 
 
111
 
 
112
    def test_char(self):
 
113
        """Exercise COLLATE-ish options on string types."""
 
114
 
 
115
        columns = [
 
116
            (mssql.MSChar, [], {},
 
117
             'CHAR'),
 
118
            (mssql.MSChar, [1], {},
 
119
             'CHAR(1)'),
 
120
            (mssql.MSChar, [1], {'collation': 'Latin1_General_CI_AS'},
 
121
             'CHAR(1) COLLATE Latin1_General_CI_AS'),
 
122
 
 
123
            (mssql.MSNChar, [], {},
 
124
             'NCHAR'),
 
125
            (mssql.MSNChar, [1], {},
 
126
             'NCHAR(1)'),
 
127
            (mssql.MSNChar, [1], {'collation': 'Latin1_General_CI_AS'},
 
128
             'NCHAR(1) COLLATE Latin1_General_CI_AS'),
 
129
 
 
130
            (mssql.MSString, [], {},
 
131
             'VARCHAR(max)'),
 
132
            (mssql.MSString, [1], {},
 
133
             'VARCHAR(1)'),
 
134
            (mssql.MSString, [1], {'collation': 'Latin1_General_CI_AS'},
 
135
             'VARCHAR(1) COLLATE Latin1_General_CI_AS'),
 
136
 
 
137
            (mssql.MSNVarchar, [], {},
 
138
             'NVARCHAR(max)'),
 
139
            (mssql.MSNVarchar, [1], {},
 
140
             'NVARCHAR(1)'),
 
141
            (mssql.MSNVarchar, [1], {'collation': 'Latin1_General_CI_AS'},
 
142
             'NVARCHAR(1) COLLATE Latin1_General_CI_AS'),
 
143
 
 
144
            (mssql.MSText, [], {},
 
145
             'TEXT'),
 
146
            (mssql.MSText, [], {'collation': 'Latin1_General_CI_AS'},
 
147
             'TEXT COLLATE Latin1_General_CI_AS'),
 
148
 
 
149
            (mssql.MSNText, [], {},
 
150
             'NTEXT'),
 
151
            (mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'},
 
152
             'NTEXT COLLATE Latin1_General_CI_AS'),
 
153
           ]
 
154
 
 
155
        metadata = MetaData()
 
156
        table_args = ['test_mssql_charset', metadata]
 
157
        for index, spec in enumerate(columns):
 
158
            type_, args, kw, res = spec
 
159
            table_args.append(
 
160
                Column('c%s' % index, type_(*args, **kw), nullable=None))
 
161
 
 
162
        charset_table = Table(*table_args)
 
163
        dialect = mssql.dialect()
 
164
        gen = dialect.ddl_compiler(dialect, schema.CreateTable(charset_table))
 
165
 
 
166
        for col in charset_table.c:
 
167
            index = int(col.name[1:])
 
168
            testing.eq_(gen.get_column_specification(col),
 
169
                           "%s %s" % (col.name, columns[index][3]))
 
170
            self.assert_(repr(col))
 
171
 
 
172
 
 
173
    def test_timestamp(self):
 
174
        """Exercise TIMESTAMP column."""
 
175
 
 
176
        dialect = mssql.dialect()
 
177
 
 
178
        metadata = MetaData()
 
179
        spec, expected = (TIMESTAMP, 'TIMESTAMP')
 
180
        t = Table('mssql_ts', metadata,
 
181
                   Column('id', Integer, primary_key=True),
 
182
                   Column('t', spec, nullable=None))
 
183
        gen = dialect.ddl_compiler(dialect, schema.CreateTable(t))
 
184
        testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected)
 
185
        self.assert_(repr(t.c.t))
 
186
 
 
187
    def test_money(self):
 
188
        """Exercise type specification for money types."""
 
189
 
 
190
        columns = [(mssql.MSMoney, [], {}, 'MONEY'),
 
191
                   (mssql.MSSmallMoney, [], {}, 'SMALLMONEY')]
 
192
        metadata = MetaData()
 
193
        table_args = ['test_mssql_money', metadata]
 
194
        for index, spec in enumerate(columns):
 
195
            type_, args, kw, res = spec
 
196
            table_args.append(Column('c%s' % index, type_(*args, **kw),
 
197
                              nullable=None))
 
198
        money_table = Table(*table_args)
 
199
        dialect = mssql.dialect()
 
200
        gen = dialect.ddl_compiler(dialect,
 
201
                                   schema.CreateTable(money_table))
 
202
        for col in money_table.c:
 
203
            index = int(col.name[1:])
 
204
            testing.eq_(gen.get_column_specification(col), '%s %s'
 
205
                        % (col.name, columns[index][3]))
 
206
            self.assert_(repr(col))
 
207
 
 
208
    def test_binary(self):
 
209
        "Exercise type specification for binary types."
 
210
 
 
211
        columns = [
 
212
            # column type, args, kwargs, expected ddl
 
213
            (mssql.MSBinary, [], {},
 
214
             'BINARY'),
 
215
            (mssql.MSBinary, [10], {},
 
216
             'BINARY(10)'),
 
217
 
 
218
            (types.BINARY, [], {},
 
219
             'BINARY'),
 
220
            (types.BINARY, [10], {},
 
221
             'BINARY(10)'),
 
222
 
 
223
            (mssql.MSVarBinary, [], {},
 
224
             'VARBINARY(max)'),
 
225
            (mssql.MSVarBinary, [10], {},
 
226
             'VARBINARY(10)'),
 
227
 
 
228
            (types.VARBINARY, [10], {},
 
229
             'VARBINARY(10)'),
 
230
            (types.VARBINARY, [], {},
 
231
             'VARBINARY(max)'),
 
232
 
 
233
            (mssql.MSImage, [], {},
 
234
             'IMAGE'),
 
235
 
 
236
            (mssql.IMAGE, [], {},
 
237
             'IMAGE'),
 
238
 
 
239
            (types.LargeBinary, [], {},
 
240
             'IMAGE'),
 
241
        ]
 
242
 
 
243
        metadata = MetaData()
 
244
        table_args = ['test_mssql_binary', metadata]
 
245
        for index, spec in enumerate(columns):
 
246
            type_, args, kw, res = spec
 
247
            table_args.append(Column('c%s' % index, type_(*args, **kw),
 
248
                              nullable=None))
 
249
        binary_table = Table(*table_args)
 
250
        dialect = mssql.dialect()
 
251
        gen = dialect.ddl_compiler(dialect,
 
252
                                   schema.CreateTable(binary_table))
 
253
        for col in binary_table.c:
 
254
            index = int(col.name[1:])
 
255
            testing.eq_(gen.get_column_specification(col), '%s %s'
 
256
                        % (col.name, columns[index][3]))
 
257
            self.assert_(repr(col))
 
258
 
 
259
class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults, ComparesTables):
 
260
    __only_on__ = 'mssql'
 
261
 
 
262
    @classmethod
 
263
    def setup_class(cls):
 
264
        global metadata
 
265
        metadata = MetaData(testing.db)
 
266
 
 
267
    def teardown(self):
 
268
        metadata.drop_all()
 
269
 
 
270
    @testing.fails_on_everything_except('mssql+pyodbc',
 
271
            'this is some pyodbc-specific feature')
 
272
    def test_decimal_notation(self):
 
273
        numeric_table = Table('numeric_table', metadata, Column('id',
 
274
                              Integer, Sequence('numeric_id_seq',
 
275
                              optional=True), primary_key=True),
 
276
                              Column('numericcol',
 
277
                              Numeric(precision=38, scale=20,
 
278
                              asdecimal=True)))
 
279
        metadata.create_all()
 
280
        test_items = [decimal.Decimal(d) for d in (
 
281
            '1500000.00000000000000000000',
 
282
            '-1500000.00000000000000000000',
 
283
            '1500000',
 
284
            '0.0000000000000000002',
 
285
            '0.2',
 
286
            '-0.0000000000000000002',
 
287
            '-2E-2',
 
288
            '156666.458923543',
 
289
            '-156666.458923543',
 
290
            '1',
 
291
            '-1',
 
292
            '-1234',
 
293
            '1234',
 
294
            '2E-12',
 
295
            '4E8',
 
296
            '3E-6',
 
297
            '3E-7',
 
298
            '4.1',
 
299
            '1E-1',
 
300
            '1E-2',
 
301
            '1E-3',
 
302
            '1E-4',
 
303
            '1E-5',
 
304
            '1E-6',
 
305
            '1E-7',
 
306
            '1E-1',
 
307
            '1E-8',
 
308
            '0.2732E2',
 
309
            '-0.2432E2',
 
310
            '4.35656E2',
 
311
            '-02452E-2',
 
312
            '45125E-2',
 
313
            '1234.58965E-2',
 
314
            '1.521E+15',
 
315
            '-1E-25',
 
316
            '1E-25',
 
317
            '1254E-25',
 
318
            '-1203E-25',
 
319
            '0',
 
320
            '-0.00',
 
321
            '-0',
 
322
            '4585E12',
 
323
            '000000000000000000012',
 
324
            '000000000000.32E12',
 
325
            '00000000000000.1E+12',
 
326
            '000000000000.2E-32',
 
327
            )]
 
328
 
 
329
        for value in test_items:
 
330
            numeric_table.insert().execute(numericcol=value)
 
331
 
 
332
        for value in select([numeric_table.c.numericcol]).execute():
 
333
            assert value[0] in test_items, "%r not in test_items" % value[0]
 
334
 
 
335
    def test_float(self):
 
336
        float_table = Table('float_table', metadata, Column('id',
 
337
                            Integer, Sequence('numeric_id_seq',
 
338
                            optional=True), primary_key=True),
 
339
                            Column('floatcol', Float()))
 
340
        metadata.create_all()
 
341
        try:
 
342
            test_items = [float(d) for d in (
 
343
                '1500000.00000000000000000000',
 
344
                '-1500000.00000000000000000000',
 
345
                '1500000',
 
346
                '0.0000000000000000002',
 
347
                '0.2',
 
348
                '-0.0000000000000000002',
 
349
                '156666.458923543',
 
350
                '-156666.458923543',
 
351
                '1',
 
352
                '-1',
 
353
                '1234',
 
354
                '2E-12',
 
355
                '4E8',
 
356
                '3E-6',
 
357
                '3E-7',
 
358
                '4.1',
 
359
                '1E-1',
 
360
                '1E-2',
 
361
                '1E-3',
 
362
                '1E-4',
 
363
                '1E-5',
 
364
                '1E-6',
 
365
                '1E-7',
 
366
                '1E-8',
 
367
                )]
 
368
            for value in test_items:
 
369
                float_table.insert().execute(floatcol=value)
 
370
        except Exception:
 
371
            raise
 
372
 
 
373
 
 
374
    # todo this should suppress warnings, but it does not
 
375
    @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
 
376
    def test_dates(self):
 
377
        "Exercise type specification for date types."
 
378
 
 
379
        columns = [
 
380
            # column type, args, kwargs, expected ddl
 
381
            (mssql.MSDateTime, [], {},
 
382
             'DATETIME', []),
 
383
 
 
384
            (types.DATE, [], {},
 
385
             'DATE', ['>=', (10,)]),
 
386
            (types.Date, [], {},
 
387
             'DATE', ['>=', (10,)]),
 
388
            (types.Date, [], {},
 
389
             'DATETIME', ['<', (10,)], mssql.MSDateTime),
 
390
            (mssql.MSDate, [], {},
 
391
             'DATE', ['>=', (10,)]),
 
392
            (mssql.MSDate, [], {},
 
393
             'DATETIME', ['<', (10,)], mssql.MSDateTime),
 
394
 
 
395
            (types.TIME, [], {},
 
396
             'TIME', ['>=', (10,)]),
 
397
            (types.Time, [], {},
 
398
             'TIME', ['>=', (10,)]),
 
399
            (mssql.MSTime, [], {},
 
400
             'TIME', ['>=', (10,)]),
 
401
            (mssql.MSTime, [1], {},
 
402
             'TIME(1)', ['>=', (10,)]),
 
403
            (types.Time, [], {},
 
404
             'DATETIME', ['<', (10,)], mssql.MSDateTime),
 
405
            (mssql.MSTime, [], {},
 
406
             'TIME', ['>=', (10,)]),
 
407
 
 
408
            (mssql.MSSmallDateTime, [], {},
 
409
             'SMALLDATETIME', []),
 
410
 
 
411
            (mssql.MSDateTimeOffset, [], {},
 
412
             'DATETIMEOFFSET', ['>=', (10,)]),
 
413
            (mssql.MSDateTimeOffset, [1], {},
 
414
             'DATETIMEOFFSET(1)', ['>=', (10,)]),
 
415
 
 
416
            (mssql.MSDateTime2, [], {},
 
417
             'DATETIME2', ['>=', (10,)]),
 
418
            (mssql.MSDateTime2, [1], {},
 
419
             'DATETIME2(1)', ['>=', (10,)]),
 
420
 
 
421
            ]
 
422
 
 
423
        table_args = ['test_mssql_dates', metadata]
 
424
        for index, spec in enumerate(columns):
 
425
            type_, args, kw, res, requires = spec[0:5]
 
426
            if requires and testing._is_excluded('mssql', *requires) \
 
427
                or not requires:
 
428
                c = Column('c%s' % index, type_(*args,
 
429
                                  **kw), nullable=None)
 
430
                testing.db.dialect.type_descriptor(c.type)
 
431
                table_args.append(c)
 
432
        dates_table = Table(*table_args)
 
433
        gen = testing.db.dialect.ddl_compiler(testing.db.dialect,
 
434
                schema.CreateTable(dates_table))
 
435
        for col in dates_table.c:
 
436
            index = int(col.name[1:])
 
437
            testing.eq_(gen.get_column_specification(col), '%s %s'
 
438
                        % (col.name, columns[index][3]))
 
439
            self.assert_(repr(col))
 
440
        dates_table.create(checkfirst=True)
 
441
        reflected_dates = Table('test_mssql_dates',
 
442
                                MetaData(testing.db), autoload=True)
 
443
        for col in reflected_dates.c:
 
444
            self.assert_types_base(col, dates_table.c[col.key])
 
445
 
 
446
    def test_date_roundtrip(self):
 
447
        t = Table('test_dates', metadata,
 
448
                    Column('id', Integer,
 
449
                           Sequence('datetest_id_seq', optional=True),
 
450
                           primary_key=True),
 
451
                    Column('adate', Date),
 
452
                    Column('atime', Time),
 
453
                    Column('adatetime', DateTime))
 
454
        metadata.create_all()
 
455
        d1 = datetime.date(2007, 10, 30)
 
456
        t1 = datetime.time(11, 2, 32)
 
457
        d2 = datetime.datetime(2007, 10, 30, 11, 2, 32)
 
458
        t.insert().execute(adate=d1, adatetime=d2, atime=t1)
 
459
        t.insert().execute(adate=d2, adatetime=d2, atime=d2)
 
460
 
 
461
        x = t.select().execute().fetchall()[0]
 
462
        self.assert_(x.adate.__class__ == datetime.date)
 
463
        self.assert_(x.atime.__class__ == datetime.time)
 
464
        self.assert_(x.adatetime.__class__ == datetime.datetime)
 
465
 
 
466
        t.delete().execute()
 
467
 
 
468
        t.insert().execute(adate=d1, adatetime=d2, atime=t1)
 
469
 
 
470
        eq_(select([t.c.adate, t.c.atime, t.c.adatetime], t.c.adate
 
471
            == d1).execute().fetchall(), [(d1, t1, d2)])
 
472
 
 
473
    @emits_warning_on('mssql+mxodbc', r'.*does not have any indexes.*')
 
474
    @testing.provide_metadata
 
475
    def test_binary_reflection(self):
 
476
        "Exercise type specification for binary types."
 
477
 
 
478
        columns = [
 
479
            # column type, args, kwargs, expected ddl
 
480
            (mssql.MSBinary, [], {},
 
481
             'BINARY'),
 
482
            (mssql.MSBinary, [10], {},
 
483
             'BINARY(10)'),
 
484
 
 
485
            (types.BINARY, [], {},
 
486
             'BINARY'),
 
487
            (types.BINARY, [10], {},
 
488
             'BINARY(10)'),
 
489
 
 
490
            (mssql.MSVarBinary, [], {},
 
491
             'VARBINARY(max)'),
 
492
            (mssql.MSVarBinary, [10], {},
 
493
             'VARBINARY(10)'),
 
494
 
 
495
            (types.VARBINARY, [10], {},
 
496
             'VARBINARY(10)'),
 
497
            (types.VARBINARY, [], {},
 
498
             'VARBINARY(max)'),
 
499
 
 
500
            (mssql.MSImage, [], {},
 
501
             'IMAGE'),
 
502
 
 
503
            (mssql.IMAGE, [], {},
 
504
             'IMAGE'),
 
505
 
 
506
            (types.LargeBinary, [], {},
 
507
             'IMAGE'),
 
508
        ]
 
509
 
 
510
        metadata = self.metadata
 
511
        table_args = ['test_mssql_binary', metadata]
 
512
        for index, spec in enumerate(columns):
 
513
            type_, args, kw, res = spec
 
514
            table_args.append(Column('c%s' % index, type_(*args, **kw),
 
515
                              nullable=None))
 
516
        binary_table = Table(*table_args)
 
517
        metadata.create_all()
 
518
        reflected_binary = Table('test_mssql_binary',
 
519
                                 MetaData(testing.db), autoload=True)
 
520
        for col in reflected_binary.c:
 
521
            c1 = testing.db.dialect.type_descriptor(col.type).__class__
 
522
            c2 = \
 
523
                testing.db.dialect.type_descriptor(
 
524
                    binary_table.c[col.name].type).__class__
 
525
            assert issubclass(c1, c2), '%r is not a subclass of %r' \
 
526
                % (c1, c2)
 
527
            if binary_table.c[col.name].type.length:
 
528
                testing.eq_(col.type.length,
 
529
                            binary_table.c[col.name].type.length)
 
530
 
 
531
 
 
532
    def test_autoincrement(self):
 
533
        Table('ai_1', metadata,
 
534
               Column('int_y', Integer, primary_key=True),
 
535
               Column('int_n', Integer, DefaultClause('0'),
 
536
                      primary_key=True, autoincrement=False))
 
537
        Table('ai_2', metadata,
 
538
               Column('int_y', Integer, primary_key=True),
 
539
               Column('int_n', Integer, DefaultClause('0'),
 
540
                      primary_key=True, autoincrement=False))
 
541
        Table('ai_3', metadata,
 
542
               Column('int_n', Integer, DefaultClause('0'),
 
543
                      primary_key=True, autoincrement=False),
 
544
               Column('int_y', Integer, primary_key=True))
 
545
        Table('ai_4', metadata,
 
546
               Column('int_n', Integer, DefaultClause('0'),
 
547
                      primary_key=True, autoincrement=False),
 
548
               Column('int_n2', Integer, DefaultClause('0'),
 
549
                      primary_key=True, autoincrement=False))
 
550
        Table('ai_5', metadata,
 
551
               Column('int_y', Integer, primary_key=True),
 
552
               Column('int_n', Integer, DefaultClause('0'),
 
553
                      primary_key=True, autoincrement=False))
 
554
        Table('ai_6', metadata,
 
555
               Column('o1', String(1), DefaultClause('x'),
 
556
                      primary_key=True),
 
557
               Column('int_y', Integer, primary_key=True))
 
558
        Table('ai_7', metadata,
 
559
               Column('o1', String(1), DefaultClause('x'),
 
560
                      primary_key=True),
 
561
               Column('o2', String(1), DefaultClause('x'),
 
562
                      primary_key=True),
 
563
               Column('int_y', Integer, primary_key=True))
 
564
        Table('ai_8', metadata,
 
565
               Column('o1', String(1), DefaultClause('x'),
 
566
                      primary_key=True),
 
567
               Column('o2', String(1), DefaultClause('x'),
 
568
                      primary_key=True))
 
569
        metadata.create_all()
 
570
 
 
571
        table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
 
572
                        'ai_5', 'ai_6', 'ai_7', 'ai_8']
 
573
        mr = MetaData(testing.db)
 
574
 
 
575
        for name in table_names:
 
576
            tbl = Table(name, mr, autoload=True)
 
577
            tbl = metadata.tables[name]
 
578
            for c in tbl.c:
 
579
                if c.name.startswith('int_y'):
 
580
                    assert c.autoincrement, name
 
581
                    assert tbl._autoincrement_column is c, name
 
582
                elif c.name.startswith('int_n'):
 
583
                    assert not c.autoincrement, name
 
584
                    assert tbl._autoincrement_column is not c, name
 
585
 
 
586
            # mxodbc can't handle scope_identity() with DEFAULT VALUES
 
587
 
 
588
            if testing.db.driver == 'mxodbc':
 
589
                eng = \
 
590
                    [engines.testing_engine(options={'implicit_returning'
 
591
                     : True})]
 
592
            else:
 
593
                eng = \
 
594
                    [engines.testing_engine(options={'implicit_returning'
 
595
                     : False}),
 
596
                     engines.testing_engine(options={'implicit_returning'
 
597
                     : True})]
 
598
 
 
599
            for counter, engine in enumerate(eng):
 
600
                engine.execute(tbl.insert())
 
601
                if 'int_y' in tbl.c:
 
602
                    assert engine.scalar(select([tbl.c.int_y])) \
 
603
                        == counter + 1
 
604
                    assert list(engine.execute(tbl.select()).first()).\
 
605
                            count(counter + 1) == 1
 
606
                else:
 
607
                    assert 1 \
 
608
                        not in list(engine.execute(tbl.select()).first())
 
609
                engine.execute(tbl.delete())
 
610
 
 
611
class MonkeyPatchedBinaryTest(fixtures.TestBase):
 
612
    __only_on__ = 'mssql+pymssql'
 
613
 
 
614
    def test_unicode(self):
 
615
        module = __import__('pymssql')
 
616
        result = module.Binary('foo')
 
617
        eq_(result, 'foo')
 
618
 
 
619
    def test_bytes(self):
 
620
        module = __import__('pymssql')
 
621
        input = b('\x80\x03]q\x00X\x03\x00\x00\x00oneq\x01a.')
 
622
        expected_result = input
 
623
        result = module.Binary(input)
 
624
        eq_(result, expected_result)
 
625
 
 
626
class BinaryTest(fixtures.TestBase, AssertsExecutionResults):
 
627
    """Test the Binary and VarBinary types"""
 
628
 
 
629
    __only_on__ = 'mssql'
 
630
 
 
631
    @classmethod
 
632
    def setup_class(cls):
 
633
        global binary_table, MyPickleType
 
634
 
 
635
        class MyPickleType(types.TypeDecorator):
 
636
            impl = PickleType
 
637
 
 
638
            def process_bind_param(self, value, dialect):
 
639
                if value:
 
640
                    value.stuff = 'this is modified stuff'
 
641
                return value
 
642
 
 
643
            def process_result_value(self, value, dialect):
 
644
                if value:
 
645
                    value.stuff = 'this is the right stuff'
 
646
                return value
 
647
 
 
648
        binary_table = Table(
 
649
            'binary_table',
 
650
            MetaData(testing.db),
 
651
            Column('primary_id', Integer, Sequence('binary_id_seq',
 
652
                   optional=True), primary_key=True),
 
653
            Column('data', mssql.MSVarBinary(8000)),
 
654
            Column('data_image', mssql.MSImage),
 
655
            Column('data_slice', types.BINARY(100)),
 
656
            Column('misc', String(30)),
 
657
            Column('pickled', PickleType),
 
658
            Column('mypickle', MyPickleType),
 
659
            )
 
660
        binary_table.create()
 
661
 
 
662
    def teardown(self):
 
663
        binary_table.delete().execute()
 
664
 
 
665
    @classmethod
 
666
    def teardown_class(cls):
 
667
        binary_table.drop()
 
668
 
 
669
    def test_binary(self):
 
670
        testobj1 = pickleable.Foo('im foo 1')
 
671
        testobj2 = pickleable.Foo('im foo 2')
 
672
        testobj3 = pickleable.Foo('im foo 3')
 
673
        stream1 = self.load_stream('binary_data_one.dat')
 
674
        stream2 = self.load_stream('binary_data_two.dat')
 
675
        binary_table.insert().execute(
 
676
            primary_id=1,
 
677
            misc='binary_data_one.dat',
 
678
            data=stream1,
 
679
            data_image=stream1,
 
680
            data_slice=stream1[0:100],
 
681
            pickled=testobj1,
 
682
            mypickle=testobj3,
 
683
            )
 
684
        binary_table.insert().execute(
 
685
            primary_id=2,
 
686
            misc='binary_data_two.dat',
 
687
            data=stream2,
 
688
            data_image=stream2,
 
689
            data_slice=stream2[0:99],
 
690
            pickled=testobj2,
 
691
            )
 
692
 
 
693
        # TODO: pyodbc does not seem to accept "None" for a VARBINARY
 
694
        # column (data=None). error:  [Microsoft][ODBC SQL Server
 
695
        # Driver][SQL Server]Implicit conversion from data type varchar
 
696
        # to varbinary is not allowed. Use the CONVERT function to run
 
697
        # this query. (257) binary_table.insert().execute(primary_id=3,
 
698
        # misc='binary_data_two.dat', data=None, data_image=None,
 
699
        # data_slice=stream2[0:99], pickled=None)
 
700
 
 
701
        binary_table.insert().execute(primary_id=3,
 
702
                misc='binary_data_two.dat', data_image=None,
 
703
                data_slice=stream2[0:99], pickled=None)
 
704
        for stmt in \
 
705
            binary_table.select(order_by=binary_table.c.primary_id), \
 
706
            text('select * from binary_table order by '
 
707
                 'binary_table.primary_id',
 
708
                 typemap=dict(data=mssql.MSVarBinary(8000),
 
709
                 data_image=mssql.MSImage,
 
710
                 data_slice=types.BINARY(100), pickled=PickleType,
 
711
                 mypickle=MyPickleType), bind=testing.db):
 
712
            l = stmt.execute().fetchall()
 
713
            eq_(list(stream1), list(l[0]['data']))
 
714
            paddedstream = list(stream1[0:100])
 
715
            paddedstream.extend(['\x00'] * (100 - len(paddedstream)))
 
716
            eq_(paddedstream, list(l[0]['data_slice']))
 
717
            eq_(list(stream2), list(l[1]['data']))
 
718
            eq_(list(stream2), list(l[1]['data_image']))
 
719
            eq_(testobj1, l[0]['pickled'])
 
720
            eq_(testobj2, l[1]['pickled'])
 
721
            eq_(testobj3.moredata, l[0]['mypickle'].moredata)
 
722
            eq_(l[0]['mypickle'].stuff, 'this is the right stuff')
 
723
 
 
724
    def load_stream(self, name, len=3000):
 
725
        fp = open(os.path.join(os.path.dirname(__file__), "..", "..", name), 'rb')
 
726
        stream = fp.read(len)
 
727
        fp.close()
 
728
        return stream