~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/dialect/postgresql/test_types.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# coding: utf-8
 
2
from __future__ import with_statement
 
3
from sqlalchemy.testing.assertions import eq_, assert_raises, \
 
4
                assert_raises_message, is_, AssertsExecutionResults, \
 
5
                AssertsCompiledSQL, ComparesTables
 
6
from sqlalchemy.testing import engines, fixtures
 
7
from sqlalchemy import testing
 
8
import datetime
 
9
from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
 
10
            String, Sequence, ForeignKey, join, Numeric, \
 
11
            PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
 
12
            func, literal_column, literal, bindparam, cast, extract, \
 
13
            SmallInteger, Enum, REAL, update, insert, Index, delete, \
 
14
            and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text
 
15
from sqlalchemy.orm import Session, mapper, aliased
 
16
from sqlalchemy import exc, schema, types
 
17
from sqlalchemy.dialects.postgresql import base as postgresql
 
18
from sqlalchemy.dialects.postgresql import HSTORE, hstore, array, \
 
19
            INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, TSTZRANGE
 
20
import decimal
 
21
from sqlalchemy import util
 
22
from sqlalchemy.testing.util import round_decimal
 
23
from sqlalchemy.sql import table, column, operators
 
24
import logging
 
25
import re
 
26
 
 
27
class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults):
 
28
    __only_on__ = 'postgresql'
 
29
    __dialect__ = postgresql.dialect()
 
30
 
 
31
    @classmethod
 
32
    def define_tables(cls, metadata):
 
33
        data_table = Table('data_table', metadata,
 
34
            Column('id', Integer, primary_key=True),
 
35
            Column('data', Integer)
 
36
        )
 
37
 
 
38
    @classmethod
 
39
    def insert_data(cls):
 
40
        data_table = cls.tables.data_table
 
41
 
 
42
        data_table.insert().execute(
 
43
            {'data':3},
 
44
            {'data':5},
 
45
            {'data':7},
 
46
            {'data':2},
 
47
            {'data':15},
 
48
            {'data':12},
 
49
            {'data':6},
 
50
            {'data':478},
 
51
            {'data':52},
 
52
            {'data':9},
 
53
        )
 
54
 
 
55
    @testing.fails_on('postgresql+zxjdbc',
 
56
                      'XXX: postgresql+zxjdbc currently returns a Decimal result for Float')
 
57
    def test_float_coercion(self):
 
58
        data_table = self.tables.data_table
 
59
 
 
60
        for type_, result in [
 
61
            (Numeric, decimal.Decimal('140.381230939')),
 
62
            (Float, 140.381230939),
 
63
            (Float(asdecimal=True), decimal.Decimal('140.381230939')),
 
64
            (Numeric(asdecimal=False), 140.381230939),
 
65
        ]:
 
66
            ret = testing.db.execute(
 
67
                select([
 
68
                    func.stddev_pop(data_table.c.data, type_=type_)
 
69
                ])
 
70
            ).scalar()
 
71
 
 
72
            eq_(round_decimal(ret, 9), result)
 
73
 
 
74
            ret = testing.db.execute(
 
75
                select([
 
76
                    cast(func.stddev_pop(data_table.c.data), type_)
 
77
                ])
 
78
            ).scalar()
 
79
            eq_(round_decimal(ret, 9), result)
 
80
 
 
81
    @testing.fails_on('postgresql+zxjdbc',
 
82
                      'zxjdbc has no support for PG arrays')
 
83
    @testing.provide_metadata
 
84
    def test_arrays(self):
 
85
        metadata = self.metadata
 
86
        t1 = Table('t', metadata,
 
87
            Column('x', postgresql.ARRAY(Float)),
 
88
            Column('y', postgresql.ARRAY(REAL)),
 
89
            Column('z', postgresql.ARRAY(postgresql.DOUBLE_PRECISION)),
 
90
            Column('q', postgresql.ARRAY(Numeric))
 
91
        )
 
92
        metadata.create_all()
 
93
        t1.insert().execute(x=[5], y=[5], z=[6], q=[decimal.Decimal("6.4")])
 
94
        row = t1.select().execute().first()
 
95
        eq_(
 
96
            row,
 
97
            ([5], [5], [6], [decimal.Decimal("6.4")])
 
98
        )
 
99
 
 
100
class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
 
101
 
 
102
    __only_on__ = 'postgresql'
 
103
    __dialect__ = postgresql.dialect()
 
104
 
 
105
    def test_compile(self):
 
106
        e1 = Enum('x', 'y', 'z', name='somename')
 
107
        e2 = Enum('x', 'y', 'z', name='somename', schema='someschema')
 
108
        self.assert_compile(postgresql.CreateEnumType(e1),
 
109
                            "CREATE TYPE somename AS ENUM ('x','y','z')"
 
110
                            )
 
111
        self.assert_compile(postgresql.CreateEnumType(e2),
 
112
                            "CREATE TYPE someschema.somename AS ENUM "
 
113
                            "('x','y','z')")
 
114
        self.assert_compile(postgresql.DropEnumType(e1),
 
115
                            'DROP TYPE somename')
 
116
        self.assert_compile(postgresql.DropEnumType(e2),
 
117
                            'DROP TYPE someschema.somename')
 
118
        t1 = Table('sometable', MetaData(), Column('somecolumn', e1))
 
119
        self.assert_compile(schema.CreateTable(t1),
 
120
                            'CREATE TABLE sometable (somecolumn '
 
121
                            'somename)')
 
122
        t1 = Table('sometable', MetaData(), Column('somecolumn',
 
123
                   Enum('x', 'y', 'z', native_enum=False)))
 
124
        self.assert_compile(schema.CreateTable(t1),
 
125
                            "CREATE TABLE sometable (somecolumn "
 
126
                            "VARCHAR(1), CHECK (somecolumn IN ('x', "
 
127
                            "'y', 'z')))")
 
128
 
 
129
    @testing.fails_on('postgresql+zxjdbc',
 
130
                      'zxjdbc fails on ENUM: column "XXX" is of type '
 
131
                      'XXX but expression is of type character varying')
 
132
    @testing.fails_on('postgresql+pg8000',
 
133
                      'zxjdbc fails on ENUM: column "XXX" is of type '
 
134
                      'XXX but expression is of type text')
 
135
    def test_create_table(self):
 
136
        metadata = MetaData(testing.db)
 
137
        t1 = Table('table', metadata, Column('id', Integer,
 
138
                   primary_key=True), Column('value', Enum('one', 'two'
 
139
                   , 'three', name='onetwothreetype')))
 
140
        t1.create()
 
141
        t1.create(checkfirst=True)  # check the create
 
142
        try:
 
143
            t1.insert().execute(value='two')
 
144
            t1.insert().execute(value='three')
 
145
            t1.insert().execute(value='three')
 
146
            eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
 
147
                [(1, 'two'), (2, 'three'), (3, 'three')])
 
148
        finally:
 
149
            metadata.drop_all()
 
150
            metadata.drop_all()
 
151
 
 
152
    def test_name_required(self):
 
153
        metadata = MetaData(testing.db)
 
154
        etype = Enum('four', 'five', 'six', metadata=metadata)
 
155
        assert_raises(exc.CompileError, etype.create)
 
156
        assert_raises(exc.CompileError, etype.compile,
 
157
                      dialect=postgresql.dialect())
 
158
 
 
159
    @testing.fails_on('postgresql+zxjdbc',
 
160
                      'zxjdbc fails on ENUM: column "XXX" is of type '
 
161
                      'XXX but expression is of type character varying')
 
162
    @testing.fails_on('postgresql+pg8000',
 
163
                      'zxjdbc fails on ENUM: column "XXX" is of type '
 
164
                      'XXX but expression is of type text')
 
165
    @testing.provide_metadata
 
166
    def test_unicode_labels(self):
 
167
        metadata = self.metadata
 
168
        t1 = Table('table', metadata,
 
169
            Column('id', Integer, primary_key=True),
 
170
            Column('value',
 
171
                    Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'),
 
172
                            name='onetwothreetype'))
 
173
        )
 
174
        metadata.create_all()
 
175
        t1.insert().execute(value=util.u('drôle'))
 
176
        t1.insert().execute(value=util.u('réveillé'))
 
177
        t1.insert().execute(value=util.u('S’il'))
 
178
        eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
 
179
            [(1, util.u('drôle')), (2, util.u('réveillé')),
 
180
                    (3, util.u('S’il'))]
 
181
        )
 
182
        m2 = MetaData(testing.db)
 
183
        t2 = Table('table', m2, autoload=True)
 
184
        eq_(
 
185
            t2.c.value.type.enums,
 
186
            (util.u('réveillé'), util.u('drôle'), util.u('S’il'))
 
187
        )
 
188
 
 
189
    def test_non_native_type(self):
 
190
        metadata = MetaData()
 
191
        t1 = Table('foo', metadata, Column('bar', Enum('one', 'two',
 
192
                   'three', name='myenum', native_enum=False)))
 
193
 
 
194
        def go():
 
195
            t1.create(testing.db)
 
196
 
 
197
        try:
 
198
            self.assert_sql(testing.db, go, [],
 
199
                            with_sequences=[("CREATE TABLE foo (\tbar "
 
200
                            "VARCHAR(5), \tCONSTRAINT myenum CHECK "
 
201
                            "(bar IN ('one', 'two', 'three')))", {})])
 
202
        finally:
 
203
            metadata.drop_all(testing.db)
 
204
 
 
205
    @testing.provide_metadata
 
206
    def test_disable_create(self):
 
207
        metadata = self.metadata
 
208
 
 
209
        e1 = postgresql.ENUM('one', 'two', 'three',
 
210
                            name="myenum",
 
211
                            create_type=False)
 
212
 
 
213
        t1 = Table('e1', metadata,
 
214
            Column('c1', e1)
 
215
        )
 
216
        # table can be created separately
 
217
        # without conflict
 
218
        e1.create(bind=testing.db)
 
219
        t1.create(testing.db)
 
220
        t1.drop(testing.db)
 
221
        e1.drop(bind=testing.db)
 
222
 
 
223
    @testing.provide_metadata
 
224
    def test_generate_multiple(self):
 
225
        """Test that the same enum twice only generates once
 
226
        for the create_all() call, without using checkfirst.
 
227
 
 
228
        A 'memo' collection held by the DDL runner
 
229
        now handles this.
 
230
 
 
231
        """
 
232
        metadata = self.metadata
 
233
 
 
234
        e1 = Enum('one', 'two', 'three',
 
235
                            name="myenum")
 
236
        t1 = Table('e1', metadata,
 
237
            Column('c1', e1)
 
238
        )
 
239
 
 
240
        t2 = Table('e2', metadata,
 
241
            Column('c1', e1)
 
242
        )
 
243
 
 
244
        metadata.create_all(checkfirst=False)
 
245
        metadata.drop_all(checkfirst=False)
 
246
 
 
247
    def test_non_native_dialect(self):
 
248
        engine = engines.testing_engine()
 
249
        engine.connect()
 
250
        engine.dialect.supports_native_enum = False
 
251
        metadata = MetaData()
 
252
        t1 = Table('foo', metadata, Column('bar', Enum('one', 'two',
 
253
                   'three', name='myenum')))
 
254
 
 
255
        def go():
 
256
            t1.create(engine)
 
257
 
 
258
        try:
 
259
            self.assert_sql(engine, go, [],
 
260
                            with_sequences=[("CREATE TABLE foo (\tbar "
 
261
                            "VARCHAR(5), \tCONSTRAINT myenum CHECK "
 
262
                            "(bar IN ('one', 'two', 'three')))", {})])
 
263
        finally:
 
264
            metadata.drop_all(engine)
 
265
 
 
266
    def test_standalone_enum(self):
 
267
        metadata = MetaData(testing.db)
 
268
        etype = Enum('four', 'five', 'six', name='fourfivesixtype',
 
269
                     metadata=metadata)
 
270
        etype.create()
 
271
        try:
 
272
            assert testing.db.dialect.has_type(testing.db,
 
273
                    'fourfivesixtype')
 
274
        finally:
 
275
            etype.drop()
 
276
            assert not testing.db.dialect.has_type(testing.db,
 
277
                    'fourfivesixtype')
 
278
        metadata.create_all()
 
279
        try:
 
280
            assert testing.db.dialect.has_type(testing.db,
 
281
                    'fourfivesixtype')
 
282
        finally:
 
283
            metadata.drop_all()
 
284
            assert not testing.db.dialect.has_type(testing.db,
 
285
                    'fourfivesixtype')
 
286
 
 
287
    def test_no_support(self):
 
288
        def server_version_info(self):
 
289
            return (8, 2)
 
290
 
 
291
        e = engines.testing_engine()
 
292
        dialect = e.dialect
 
293
        dialect._get_server_version_info = server_version_info
 
294
 
 
295
        assert dialect.supports_native_enum
 
296
        e.connect()
 
297
        assert not dialect.supports_native_enum
 
298
 
 
299
        # initialize is called again on new pool
 
300
        e.dispose()
 
301
        e.connect()
 
302
        assert not dialect.supports_native_enum
 
303
 
 
304
 
 
305
    def test_reflection(self):
 
306
        metadata = MetaData(testing.db)
 
307
        etype = Enum('four', 'five', 'six', name='fourfivesixtype',
 
308
                     metadata=metadata)
 
309
        t1 = Table('table', metadata, Column('id', Integer,
 
310
                   primary_key=True), Column('value', Enum('one', 'two'
 
311
                   , 'three', name='onetwothreetype')), Column('value2'
 
312
                   , etype))
 
313
        metadata.create_all()
 
314
        try:
 
315
            m2 = MetaData(testing.db)
 
316
            t2 = Table('table', m2, autoload=True)
 
317
            assert t2.c.value.type.enums == ('one', 'two', 'three')
 
318
            assert t2.c.value.type.name == 'onetwothreetype'
 
319
            assert t2.c.value2.type.enums == ('four', 'five', 'six')
 
320
            assert t2.c.value2.type.name == 'fourfivesixtype'
 
321
        finally:
 
322
            metadata.drop_all()
 
323
 
 
324
    def test_schema_reflection(self):
 
325
        metadata = MetaData(testing.db)
 
326
        etype = Enum(
 
327
            'four',
 
328
            'five',
 
329
            'six',
 
330
            name='fourfivesixtype',
 
331
            schema='test_schema',
 
332
            metadata=metadata,
 
333
            )
 
334
        t1 = Table('table', metadata, Column('id', Integer,
 
335
                   primary_key=True), Column('value', Enum('one', 'two'
 
336
                   , 'three', name='onetwothreetype',
 
337
                   schema='test_schema')), Column('value2', etype))
 
338
        metadata.create_all()
 
339
        try:
 
340
            m2 = MetaData(testing.db)
 
341
            t2 = Table('table', m2, autoload=True)
 
342
            assert t2.c.value.type.enums == ('one', 'two', 'three')
 
343
            assert t2.c.value.type.name == 'onetwothreetype'
 
344
            assert t2.c.value2.type.enums == ('four', 'five', 'six')
 
345
            assert t2.c.value2.type.name == 'fourfivesixtype'
 
346
            assert t2.c.value2.type.schema == 'test_schema'
 
347
        finally:
 
348
            metadata.drop_all()
 
349
 
 
350
class NumericInterpretationTest(fixtures.TestBase):
 
351
    __only_on__ = 'postgresql'
 
352
 
 
353
    def test_numeric_codes(self):
 
354
        from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base
 
355
 
 
356
        for dialect in (pg8000.dialect(), psycopg2.dialect()):
 
357
 
 
358
            typ = Numeric().dialect_impl(dialect)
 
359
            for code in base._INT_TYPES + base._FLOAT_TYPES + \
 
360
                        base._DECIMAL_TYPES:
 
361
                proc = typ.result_processor(dialect, code)
 
362
                val = 23.7
 
363
                if proc is not None:
 
364
                    val = proc(val)
 
365
                assert val in (23.7, decimal.Decimal("23.7"))
 
366
 
 
367
    @testing.provide_metadata
 
368
    def test_numeric_default(self):
 
369
        metadata = self.metadata
 
370
        # pg8000 appears to fail when the value is 0,
 
371
        # returns an int instead of decimal.
 
372
        t =Table('t', metadata,
 
373
            Column('id', Integer, primary_key=True),
 
374
            Column('nd', Numeric(asdecimal=True), default=1),
 
375
            Column('nf', Numeric(asdecimal=False), default=1),
 
376
            Column('fd', Float(asdecimal=True), default=1),
 
377
            Column('ff', Float(asdecimal=False), default=1),
 
378
        )
 
379
        metadata.create_all()
 
380
        r = t.insert().execute()
 
381
 
 
382
        row = t.select().execute().first()
 
383
        assert isinstance(row[1], decimal.Decimal)
 
384
        assert isinstance(row[2], float)
 
385
        assert isinstance(row[3], decimal.Decimal)
 
386
        assert isinstance(row[4], float)
 
387
        eq_(
 
388
            row,
 
389
            (1, decimal.Decimal("1"), 1, decimal.Decimal("1"), 1)
 
390
        )
 
391
 
 
392
class TimezoneTest(fixtures.TestBase):
 
393
 
 
394
    """Test timezone-aware datetimes.
 
395
 
 
396
    psycopg will return a datetime with a tzinfo attached to it, if
 
397
    postgresql returns it.  python then will not let you compare a
 
398
    datetime with a tzinfo to a datetime that doesnt have one.  this
 
399
    test illustrates two ways to have datetime types with and without
 
400
    timezone info. """
 
401
 
 
402
    __only_on__ = 'postgresql'
 
403
 
 
404
    @classmethod
 
405
    def setup_class(cls):
 
406
        global tztable, notztable, metadata
 
407
        metadata = MetaData(testing.db)
 
408
 
 
409
        # current_timestamp() in postgresql is assumed to return
 
410
        # TIMESTAMP WITH TIMEZONE
 
411
 
 
412
        tztable = Table('tztable', metadata, Column('id', Integer,
 
413
                        primary_key=True), Column('date',
 
414
                        DateTime(timezone=True),
 
415
                        onupdate=func.current_timestamp()),
 
416
                        Column('name', String(20)))
 
417
        notztable = Table('notztable', metadata, Column('id', Integer,
 
418
                          primary_key=True), Column('date',
 
419
                          DateTime(timezone=False),
 
420
                          onupdate=cast(func.current_timestamp(),
 
421
                          DateTime(timezone=False))), Column('name',
 
422
                          String(20)))
 
423
        metadata.create_all()
 
424
 
 
425
    @classmethod
 
426
    def teardown_class(cls):
 
427
        metadata.drop_all()
 
428
 
 
429
    @testing.fails_on('postgresql+zxjdbc',
 
430
                      "XXX: postgresql+zxjdbc doesn't give a tzinfo back")
 
431
    def test_with_timezone(self):
 
432
 
 
433
        # get a date with a tzinfo
 
434
 
 
435
        somedate = \
 
436
            testing.db.connect().scalar(func.current_timestamp().select())
 
437
        assert somedate.tzinfo
 
438
        tztable.insert().execute(id=1, name='row1', date=somedate)
 
439
        row = select([tztable.c.date], tztable.c.id
 
440
                     == 1).execute().first()
 
441
        eq_(row[0], somedate)
 
442
        eq_(somedate.tzinfo.utcoffset(somedate),
 
443
            row[0].tzinfo.utcoffset(row[0]))
 
444
        result = tztable.update(tztable.c.id
 
445
                                == 1).returning(tztable.c.date).\
 
446
                                    execute(name='newname'
 
447
                )
 
448
        row = result.first()
 
449
        assert row[0] >= somedate
 
450
 
 
451
    def test_without_timezone(self):
 
452
 
 
453
        # get a date without a tzinfo
 
454
 
 
455
        somedate = datetime.datetime( 2005, 10, 20, 11, 52, 0, )
 
456
        assert not somedate.tzinfo
 
457
        notztable.insert().execute(id=1, name='row1', date=somedate)
 
458
        row = select([notztable.c.date], notztable.c.id
 
459
                     == 1).execute().first()
 
460
        eq_(row[0], somedate)
 
461
        eq_(row[0].tzinfo, None)
 
462
        result = notztable.update(notztable.c.id
 
463
                                  == 1).returning(notztable.c.date).\
 
464
                                    execute(name='newname'
 
465
                )
 
466
        row = result.first()
 
467
        assert row[0] >= somedate
 
468
 
 
469
class TimePrecisionTest(fixtures.TestBase, AssertsCompiledSQL):
 
470
 
 
471
    __dialect__ = postgresql.dialect()
 
472
 
 
473
    def test_compile(self):
 
474
        for type_, expected in [
 
475
            (postgresql.TIME(), 'TIME WITHOUT TIME ZONE'),
 
476
            (postgresql.TIME(precision=5), 'TIME(5) WITHOUT TIME ZONE'
 
477
             ),
 
478
            (postgresql.TIME(timezone=True, precision=5),
 
479
             'TIME(5) WITH TIME ZONE'),
 
480
            (postgresql.TIMESTAMP(), 'TIMESTAMP WITHOUT TIME ZONE'),
 
481
            (postgresql.TIMESTAMP(precision=5),
 
482
             'TIMESTAMP(5) WITHOUT TIME ZONE'),
 
483
            (postgresql.TIMESTAMP(timezone=True, precision=5),
 
484
             'TIMESTAMP(5) WITH TIME ZONE'),
 
485
            ]:
 
486
            self.assert_compile(type_, expected)
 
487
 
 
488
    @testing.only_on('postgresql', 'DB specific feature')
 
489
    @testing.provide_metadata
 
490
    def test_reflection(self):
 
491
        metadata = self.metadata
 
492
        t1 = Table(
 
493
            't1',
 
494
            metadata,
 
495
            Column('c1', postgresql.TIME()),
 
496
            Column('c2', postgresql.TIME(precision=5)),
 
497
            Column('c3', postgresql.TIME(timezone=True, precision=5)),
 
498
            Column('c4', postgresql.TIMESTAMP()),
 
499
            Column('c5', postgresql.TIMESTAMP(precision=5)),
 
500
            Column('c6', postgresql.TIMESTAMP(timezone=True,
 
501
                   precision=5)),
 
502
            )
 
503
        t1.create()
 
504
        m2 = MetaData(testing.db)
 
505
        t2 = Table('t1', m2, autoload=True)
 
506
        eq_(t2.c.c1.type.precision, None)
 
507
        eq_(t2.c.c2.type.precision, 5)
 
508
        eq_(t2.c.c3.type.precision, 5)
 
509
        eq_(t2.c.c4.type.precision, None)
 
510
        eq_(t2.c.c5.type.precision, 5)
 
511
        eq_(t2.c.c6.type.precision, 5)
 
512
        eq_(t2.c.c1.type.timezone, False)
 
513
        eq_(t2.c.c2.type.timezone, False)
 
514
        eq_(t2.c.c3.type.timezone, True)
 
515
        eq_(t2.c.c4.type.timezone, False)
 
516
        eq_(t2.c.c5.type.timezone, False)
 
517
        eq_(t2.c.c6.type.timezone, True)
 
518
 
 
519
class ArrayTest(fixtures.TablesTest, AssertsExecutionResults):
 
520
 
 
521
    __only_on__ = 'postgresql'
 
522
 
 
523
    __unsupported_on__ = 'postgresql+pg8000', 'postgresql+zxjdbc'
 
524
 
 
525
    @classmethod
 
526
    def define_tables(cls, metadata):
 
527
 
 
528
        class ProcValue(TypeDecorator):
 
529
            impl = postgresql.ARRAY(Integer, dimensions=2)
 
530
 
 
531
            def process_bind_param(self, value, dialect):
 
532
                if value is None:
 
533
                    return None
 
534
                return [
 
535
                    [x + 5 for x in v]
 
536
                    for v in value
 
537
                ]
 
538
 
 
539
            def process_result_value(self, value, dialect):
 
540
                if value is None:
 
541
                    return None
 
542
                return [
 
543
                    [x - 7 for x in v]
 
544
                    for v in value
 
545
                ]
 
546
 
 
547
        Table('arrtable', metadata,
 
548
                        Column('id', Integer, primary_key=True),
 
549
                        Column('intarr', postgresql.ARRAY(Integer)),
 
550
                         Column('strarr', postgresql.ARRAY(Unicode())),
 
551
                        Column('dimarr', ProcValue)
 
552
                    )
 
553
 
 
554
        Table('dim_arrtable', metadata,
 
555
                        Column('id', Integer, primary_key=True),
 
556
                        Column('intarr', postgresql.ARRAY(Integer, dimensions=1)),
 
557
                         Column('strarr', postgresql.ARRAY(Unicode(), dimensions=1)),
 
558
                        Column('dimarr', ProcValue)
 
559
                    )
 
560
 
 
561
    def _fixture_456(self, table):
 
562
        testing.db.execute(
 
563
                table.insert(),
 
564
                intarr=[4, 5, 6]
 
565
        )
 
566
 
 
567
    def test_reflect_array_column(self):
 
568
        metadata2 = MetaData(testing.db)
 
569
        tbl = Table('arrtable', metadata2, autoload=True)
 
570
        assert isinstance(tbl.c.intarr.type, postgresql.ARRAY)
 
571
        assert isinstance(tbl.c.strarr.type, postgresql.ARRAY)
 
572
        assert isinstance(tbl.c.intarr.type.item_type, Integer)
 
573
        assert isinstance(tbl.c.strarr.type.item_type, String)
 
574
 
 
575
    def test_insert_array(self):
 
576
        arrtable = self.tables.arrtable
 
577
        arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
 
578
                                  util.u('def')])
 
579
        results = arrtable.select().execute().fetchall()
 
580
        eq_(len(results), 1)
 
581
        eq_(results[0]['intarr'], [1, 2, 3])
 
582
        eq_(results[0]['strarr'], [util.u('abc'), util.u('def')])
 
583
 
 
584
    def test_array_where(self):
 
585
        arrtable = self.tables.arrtable
 
586
        arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
 
587
                                  util.u('def')])
 
588
        arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC'))
 
589
        results = arrtable.select().where(arrtable.c.intarr == [1, 2,
 
590
                3]).execute().fetchall()
 
591
        eq_(len(results), 1)
 
592
        eq_(results[0]['intarr'], [1, 2, 3])
 
593
 
 
594
    def test_array_concat(self):
 
595
        arrtable = self.tables.arrtable
 
596
        arrtable.insert().execute(intarr=[1, 2, 3],
 
597
                    strarr=[util.u('abc'), util.u('def')])
 
598
        results = select([arrtable.c.intarr + [4, 5,
 
599
                         6]]).execute().fetchall()
 
600
        eq_(len(results), 1)
 
601
        eq_(results[0][0], [ 1, 2, 3, 4, 5, 6, ])
 
602
 
 
603
    def test_array_subtype_resultprocessor(self):
 
604
        arrtable = self.tables.arrtable
 
605
        arrtable.insert().execute(intarr=[4, 5, 6],
 
606
                                  strarr=[[util.ue('m\xe4\xe4')], [
 
607
                                    util.ue('m\xf6\xf6')]])
 
608
        arrtable.insert().execute(intarr=[1, 2, 3], strarr=[
 
609
                        util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')])
 
610
        results = \
 
611
            arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
 
612
        eq_(len(results), 2)
 
613
        eq_(results[0]['strarr'], [util.ue('m\xe4\xe4'), util.ue('m\xf6\xf6')])
 
614
        eq_(results[1]['strarr'], [[util.ue('m\xe4\xe4')], [util.ue('m\xf6\xf6')]])
 
615
 
 
616
    def test_array_literal(self):
 
617
        eq_(
 
618
            testing.db.scalar(
 
619
                select([
 
620
                    postgresql.array([1, 2]) + postgresql.array([3, 4, 5])
 
621
                ])
 
622
                ), [1,2,3,4,5]
 
623
        )
 
624
 
 
625
    def test_array_getitem_single_type(self):
 
626
        arrtable = self.tables.arrtable
 
627
        is_(arrtable.c.intarr[1].type._type_affinity, Integer)
 
628
        is_(arrtable.c.strarr[1].type._type_affinity, String)
 
629
 
 
630
    def test_array_getitem_slice_type(self):
 
631
        arrtable = self.tables.arrtable
 
632
        is_(arrtable.c.intarr[1:3].type._type_affinity, postgresql.ARRAY)
 
633
        is_(arrtable.c.strarr[1:3].type._type_affinity, postgresql.ARRAY)
 
634
 
 
635
    def test_array_getitem_single_exec(self):
 
636
        arrtable = self.tables.arrtable
 
637
        self._fixture_456(arrtable)
 
638
        eq_(
 
639
            testing.db.scalar(select([arrtable.c.intarr[2]])),
 
640
            5
 
641
        )
 
642
        testing.db.execute(
 
643
            arrtable.update().values({arrtable.c.intarr[2]: 7})
 
644
        )
 
645
        eq_(
 
646
            testing.db.scalar(select([arrtable.c.intarr[2]])),
 
647
            7
 
648
        )
 
649
 
 
650
    def test_undim_array_empty(self):
 
651
        arrtable = self.tables.arrtable
 
652
        self._fixture_456(arrtable)
 
653
        eq_(
 
654
            testing.db.scalar(
 
655
                select([arrtable.c.intarr]).
 
656
                    where(arrtable.c.intarr.contains([]))
 
657
            ),
 
658
            [4, 5, 6]
 
659
        )
 
660
 
 
661
    def test_array_getitem_slice_exec(self):
 
662
        arrtable = self.tables.arrtable
 
663
        testing.db.execute(
 
664
            arrtable.insert(),
 
665
            intarr=[4, 5, 6],
 
666
            strarr=[util.u('abc'), util.u('def')]
 
667
        )
 
668
        eq_(
 
669
            testing.db.scalar(select([arrtable.c.intarr[2:3]])),
 
670
            [5, 6]
 
671
        )
 
672
        testing.db.execute(
 
673
            arrtable.update().values({arrtable.c.intarr[2:3]: [7, 8]})
 
674
        )
 
675
        eq_(
 
676
            testing.db.scalar(select([arrtable.c.intarr[2:3]])),
 
677
            [7, 8]
 
678
        )
 
679
 
 
680
 
 
681
    def _test_undim_array_contains_typed_exec(self, struct):
 
682
        arrtable = self.tables.arrtable
 
683
        self._fixture_456(arrtable)
 
684
        eq_(
 
685
            testing.db.scalar(
 
686
                select([arrtable.c.intarr]).
 
687
                    where(arrtable.c.intarr.contains(struct([4, 5])))
 
688
            ),
 
689
            [4, 5, 6]
 
690
        )
 
691
 
 
692
    def test_undim_array_contains_set_exec(self):
 
693
        self._test_undim_array_contains_typed_exec(set)
 
694
 
 
695
    def test_undim_array_contains_list_exec(self):
 
696
        self._test_undim_array_contains_typed_exec(list)
 
697
 
 
698
    def test_undim_array_contains_generator_exec(self):
 
699
        self._test_undim_array_contains_typed_exec(
 
700
                    lambda elem: (x for x in elem))
 
701
 
 
702
    def _test_dim_array_contains_typed_exec(self, struct):
 
703
        dim_arrtable = self.tables.dim_arrtable
 
704
        self._fixture_456(dim_arrtable)
 
705
        eq_(
 
706
            testing.db.scalar(
 
707
                select([dim_arrtable.c.intarr]).
 
708
                    where(dim_arrtable.c.intarr.contains(struct([4, 5])))
 
709
            ),
 
710
            [4, 5, 6]
 
711
        )
 
712
 
 
713
    def test_dim_array_contains_set_exec(self):
 
714
        self._test_dim_array_contains_typed_exec(set)
 
715
 
 
716
    def test_dim_array_contains_list_exec(self):
 
717
        self._test_dim_array_contains_typed_exec(list)
 
718
 
 
719
    def test_dim_array_contains_generator_exec(self):
 
720
        self._test_dim_array_contains_typed_exec(lambda elem: (x for x in elem))
 
721
 
 
722
    def test_array_contained_by_exec(self):
 
723
        arrtable = self.tables.arrtable
 
724
        with testing.db.connect() as conn:
 
725
            conn.execute(
 
726
                arrtable.insert(),
 
727
                intarr=[6, 5, 4]
 
728
            )
 
729
            eq_(
 
730
                conn.scalar(
 
731
                    select([arrtable.c.intarr.contained_by([4, 5, 6, 7])])
 
732
                ),
 
733
                True
 
734
            )
 
735
 
 
736
    def test_array_overlap_exec(self):
 
737
        arrtable = self.tables.arrtable
 
738
        with testing.db.connect() as conn:
 
739
            conn.execute(
 
740
                arrtable.insert(),
 
741
                intarr=[4, 5, 6]
 
742
            )
 
743
            eq_(
 
744
                conn.scalar(
 
745
                    select([arrtable.c.intarr]).
 
746
                        where(arrtable.c.intarr.overlap([7, 6]))
 
747
                ),
 
748
                [4, 5, 6]
 
749
            )
 
750
 
 
751
    def test_array_any_exec(self):
 
752
        arrtable = self.tables.arrtable
 
753
        with testing.db.connect() as conn:
 
754
            conn.execute(
 
755
                arrtable.insert(),
 
756
                intarr=[4, 5, 6]
 
757
            )
 
758
            eq_(
 
759
                conn.scalar(
 
760
                    select([arrtable.c.intarr]).
 
761
                        where(postgresql.Any(5, arrtable.c.intarr))
 
762
                ),
 
763
                [4, 5, 6]
 
764
            )
 
765
 
 
766
    def test_array_all_exec(self):
 
767
        arrtable = self.tables.arrtable
 
768
        with testing.db.connect() as conn:
 
769
            conn.execute(
 
770
                arrtable.insert(),
 
771
                intarr=[4, 5, 6]
 
772
            )
 
773
            eq_(
 
774
                conn.scalar(
 
775
                    select([arrtable.c.intarr]).
 
776
                        where(arrtable.c.intarr.all(4, operator=operators.le))
 
777
                ),
 
778
                [4, 5, 6]
 
779
            )
 
780
 
 
781
 
 
782
    @testing.provide_metadata
 
783
    def test_tuple_flag(self):
 
784
        metadata = self.metadata
 
785
 
 
786
        t1 = Table('t1', metadata,
 
787
            Column('id', Integer, primary_key=True),
 
788
            Column('data', postgresql.ARRAY(String(5), as_tuple=True)),
 
789
            Column('data2', postgresql.ARRAY(Numeric(asdecimal=False), as_tuple=True)),
 
790
        )
 
791
        metadata.create_all()
 
792
        testing.db.execute(t1.insert(), id=1, data=["1","2","3"], data2=[5.4, 5.6])
 
793
        testing.db.execute(t1.insert(), id=2, data=["4", "5", "6"], data2=[1.0])
 
794
        testing.db.execute(t1.insert(), id=3, data=[["4", "5"], ["6", "7"]],
 
795
                        data2=[[5.4, 5.6], [1.0, 1.1]])
 
796
 
 
797
        r = testing.db.execute(t1.select().order_by(t1.c.id)).fetchall()
 
798
        eq_(
 
799
            r,
 
800
            [
 
801
                (1, ('1', '2', '3'), (5.4, 5.6)),
 
802
                (2, ('4', '5', '6'), (1.0,)),
 
803
                (3, (('4', '5'), ('6', '7')), ((5.4, 5.6), (1.0, 1.1)))
 
804
            ]
 
805
        )
 
806
        # hashable
 
807
        eq_(
 
808
            set(row[1] for row in r),
 
809
            set([('1', '2', '3'), ('4', '5', '6'), (('4', '5'), ('6', '7'))])
 
810
        )
 
811
 
 
812
    def test_dimension(self):
 
813
        arrtable = self.tables.arrtable
 
814
        testing.db.execute(arrtable.insert(), dimarr=[[1, 2, 3], [4,5, 6]])
 
815
        eq_(
 
816
            testing.db.scalar(select([arrtable.c.dimarr])),
 
817
            [[-1, 0, 1], [2, 3, 4]]
 
818
        )
 
819
 
 
820
class TimestampTest(fixtures.TestBase, AssertsExecutionResults):
 
821
    __only_on__ = 'postgresql'
 
822
 
 
823
    def test_timestamp(self):
 
824
        engine = testing.db
 
825
        connection = engine.connect()
 
826
 
 
827
        s = select(["timestamp '2007-12-25'"])
 
828
        result = connection.execute(s).first()
 
829
        eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0))
 
830
 
 
831
 
 
832
class SpecialTypesTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
 
833
    """test DDL and reflection of PG-specific types """
 
834
 
 
835
    __only_on__ = 'postgresql'
 
836
    __excluded_on__ = (('postgresql', '<', (8, 3, 0)),)
 
837
 
 
838
    @classmethod
 
839
    def setup_class(cls):
 
840
        global metadata, table
 
841
        metadata = MetaData(testing.db)
 
842
 
 
843
        # create these types so that we can issue
 
844
        # special SQL92 INTERVAL syntax
 
845
        class y2m(types.UserDefinedType, postgresql.INTERVAL):
 
846
            def get_col_spec(self):
 
847
                return "INTERVAL YEAR TO MONTH"
 
848
 
 
849
        class d2s(types.UserDefinedType, postgresql.INTERVAL):
 
850
            def get_col_spec(self):
 
851
                return "INTERVAL DAY TO SECOND"
 
852
 
 
853
        table = Table('sometable', metadata,
 
854
            Column('id', postgresql.UUID, primary_key=True),
 
855
            Column('flag', postgresql.BIT),
 
856
            Column('bitstring', postgresql.BIT(4)),
 
857
            Column('addr', postgresql.INET),
 
858
            Column('addr2', postgresql.MACADDR),
 
859
            Column('addr3', postgresql.CIDR),
 
860
            Column('doubleprec', postgresql.DOUBLE_PRECISION),
 
861
            Column('plain_interval', postgresql.INTERVAL),
 
862
            Column('year_interval', y2m()),
 
863
            Column('month_interval', d2s()),
 
864
            Column('precision_interval', postgresql.INTERVAL(precision=3))
 
865
        )
 
866
 
 
867
        metadata.create_all()
 
868
 
 
869
        # cheat so that the "strict type check"
 
870
        # works
 
871
        table.c.year_interval.type = postgresql.INTERVAL()
 
872
        table.c.month_interval.type = postgresql.INTERVAL()
 
873
 
 
874
    @classmethod
 
875
    def teardown_class(cls):
 
876
        metadata.drop_all()
 
877
 
 
878
    def test_reflection(self):
 
879
        m = MetaData(testing.db)
 
880
        t = Table('sometable', m, autoload=True)
 
881
 
 
882
        self.assert_tables_equal(table, t, strict_types=True)
 
883
        assert t.c.plain_interval.type.precision is None
 
884
        assert t.c.precision_interval.type.precision == 3
 
885
        assert t.c.bitstring.type.length == 4
 
886
 
 
887
    def test_bit_compile(self):
 
888
        pairs = [(postgresql.BIT(), 'BIT(1)'),
 
889
                 (postgresql.BIT(5), 'BIT(5)'),
 
890
                 (postgresql.BIT(varying=True), 'BIT VARYING'),
 
891
                 (postgresql.BIT(5, varying=True), 'BIT VARYING(5)'),
 
892
                ]
 
893
        for type_, expected in pairs:
 
894
            self.assert_compile(type_, expected)
 
895
 
 
896
    @testing.provide_metadata
 
897
    def test_bit_reflection(self):
 
898
        metadata = self.metadata
 
899
        t1 = Table('t1', metadata,
 
900
        Column('bit1', postgresql.BIT()),
 
901
        Column('bit5', postgresql.BIT(5)),
 
902
        Column('bitvarying', postgresql.BIT(varying=True)),
 
903
        Column('bitvarying5', postgresql.BIT(5, varying=True)),
 
904
        )
 
905
        t1.create()
 
906
        m2 = MetaData(testing.db)
 
907
        t2 = Table('t1', m2, autoload=True)
 
908
        eq_(t2.c.bit1.type.length, 1)
 
909
        eq_(t2.c.bit1.type.varying, False)
 
910
        eq_(t2.c.bit5.type.length, 5)
 
911
        eq_(t2.c.bit5.type.varying, False)
 
912
        eq_(t2.c.bitvarying.type.length, None)
 
913
        eq_(t2.c.bitvarying.type.varying, True)
 
914
        eq_(t2.c.bitvarying5.type.length, 5)
 
915
        eq_(t2.c.bitvarying5.type.varying, True)
 
916
 
 
917
class UUIDTest(fixtures.TestBase):
 
918
    """Test the bind/return values of the UUID type."""
 
919
 
 
920
    __only_on__ = 'postgresql'
 
921
 
 
922
    @testing.requires.python25
 
923
    @testing.fails_on('postgresql+zxjdbc',
 
924
                      'column "data" is of type uuid but expression is of type character varying')
 
925
    @testing.fails_on('postgresql+pg8000', 'No support for UUID type')
 
926
    def test_uuid_string(self):
 
927
        import uuid
 
928
        self._test_round_trip(
 
929
            Table('utable', MetaData(),
 
930
                Column('data', postgresql.UUID())
 
931
            ),
 
932
            str(uuid.uuid4()),
 
933
            str(uuid.uuid4())
 
934
        )
 
935
 
 
936
    @testing.requires.python25
 
937
    @testing.fails_on('postgresql+zxjdbc',
 
938
                      'column "data" is of type uuid but expression is of type character varying')
 
939
    @testing.fails_on('postgresql+pg8000', 'No support for UUID type')
 
940
    def test_uuid_uuid(self):
 
941
        import uuid
 
942
        self._test_round_trip(
 
943
            Table('utable', MetaData(),
 
944
                Column('data', postgresql.UUID(as_uuid=True))
 
945
            ),
 
946
            uuid.uuid4(),
 
947
            uuid.uuid4()
 
948
        )
 
949
 
 
950
    def test_no_uuid_available(self):
 
951
        from sqlalchemy.dialects.postgresql import base
 
952
        uuid_type = base._python_UUID
 
953
        base._python_UUID = None
 
954
        try:
 
955
            assert_raises(
 
956
                NotImplementedError,
 
957
                postgresql.UUID, as_uuid=True
 
958
            )
 
959
        finally:
 
960
            base._python_UUID = uuid_type
 
961
 
 
962
    def setup(self):
 
963
        self.conn = testing.db.connect()
 
964
        trans = self.conn.begin()
 
965
 
 
966
    def teardown(self):
 
967
        self.conn.close()
 
968
 
 
969
    def _test_round_trip(self, utable, value1, value2):
 
970
        utable.create(self.conn)
 
971
        self.conn.execute(utable.insert(), {'data':value1})
 
972
        self.conn.execute(utable.insert(), {'data':value2})
 
973
        r = self.conn.execute(
 
974
                select([utable.c.data]).
 
975
                    where(utable.c.data != value1)
 
976
                )
 
977
        eq_(r.fetchone()[0], value2)
 
978
        eq_(r.fetchone(), None)
 
979
 
 
980
 
 
981
 
 
982
class HStoreTest(fixtures.TestBase):
 
983
    def _assert_sql(self, construct, expected):
 
984
        dialect = postgresql.dialect()
 
985
        compiled = str(construct.compile(dialect=dialect))
 
986
        compiled = re.sub(r'\s+', ' ', compiled)
 
987
        expected = re.sub(r'\s+', ' ', expected)
 
988
        eq_(compiled, expected)
 
989
 
 
990
    def setup(self):
 
991
        metadata = MetaData()
 
992
        self.test_table = Table('test_table', metadata,
 
993
            Column('id', Integer, primary_key=True),
 
994
            Column('hash', HSTORE)
 
995
        )
 
996
        self.hashcol = self.test_table.c.hash
 
997
 
 
998
    def _test_where(self, whereclause, expected):
 
999
        stmt = select([self.test_table]).where(whereclause)
 
1000
        self._assert_sql(
 
1001
            stmt,
 
1002
            "SELECT test_table.id, test_table.hash FROM test_table "
 
1003
            "WHERE %s" % expected
 
1004
        )
 
1005
 
 
1006
    def _test_cols(self, colclause, expected, from_=True):
 
1007
        stmt = select([colclause])
 
1008
        self._assert_sql(
 
1009
            stmt,
 
1010
            (
 
1011
                "SELECT %s" +
 
1012
                (" FROM test_table" if from_ else "")
 
1013
            ) % expected
 
1014
        )
 
1015
 
 
1016
    def test_bind_serialize_default(self):
 
1017
        from sqlalchemy.engine import default
 
1018
 
 
1019
        dialect = default.DefaultDialect()
 
1020
        proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
 
1021
        eq_(
 
1022
            proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
 
1023
            '"key1"=>"value1", "key2"=>"value2"'
 
1024
        )
 
1025
 
 
1026
    def test_bind_serialize_with_slashes_and_quotes(self):
 
1027
        from sqlalchemy.engine import default
 
1028
 
 
1029
        dialect = default.DefaultDialect()
 
1030
        proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
 
1031
        eq_(
 
1032
            proc({'\\"a': '\\"1'}),
 
1033
            '"\\\\\\"a"=>"\\\\\\"1"'
 
1034
        )
 
1035
 
 
1036
    def test_parse_error(self):
 
1037
        from sqlalchemy.engine import default
 
1038
 
 
1039
        dialect = default.DefaultDialect()
 
1040
        proc = self.test_table.c.hash.type._cached_result_processor(
 
1041
                    dialect, None)
 
1042
        assert_raises_message(
 
1043
            ValueError,
 
1044
            r'''After u?'\[\.\.\.\], "key1"=>"value1", ', could not parse '''
 
1045
            '''residual at position 36: u?'crapcrapcrap, "key3"\[\.\.\.\]''',
 
1046
            proc,
 
1047
            '"key2"=>"value2", "key1"=>"value1", '
 
1048
                        'crapcrapcrap, "key3"=>"value3"'
 
1049
        )
 
1050
 
 
1051
    def test_result_deserialize_default(self):
 
1052
        from sqlalchemy.engine import default
 
1053
 
 
1054
        dialect = default.DefaultDialect()
 
1055
        proc = self.test_table.c.hash.type._cached_result_processor(
 
1056
                    dialect, None)
 
1057
        eq_(
 
1058
            proc('"key2"=>"value2", "key1"=>"value1"'),
 
1059
            {"key1": "value1", "key2": "value2"}
 
1060
        )
 
1061
 
 
1062
    def test_result_deserialize_with_slashes_and_quotes(self):
 
1063
        from sqlalchemy.engine import default
 
1064
 
 
1065
        dialect = default.DefaultDialect()
 
1066
        proc = self.test_table.c.hash.type._cached_result_processor(
 
1067
                    dialect, None)
 
1068
        eq_(
 
1069
            proc('"\\\\\\"a"=>"\\\\\\"1"'),
 
1070
            {'\\"a': '\\"1'}
 
1071
        )
 
1072
 
 
1073
    def test_bind_serialize_psycopg2(self):
 
1074
        from sqlalchemy.dialects.postgresql import psycopg2
 
1075
 
 
1076
        dialect = psycopg2.PGDialect_psycopg2()
 
1077
        dialect._has_native_hstore = True
 
1078
        proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
 
1079
        is_(proc, None)
 
1080
 
 
1081
        dialect = psycopg2.PGDialect_psycopg2()
 
1082
        dialect._has_native_hstore = False
 
1083
        proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
 
1084
        eq_(
 
1085
            proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
 
1086
            '"key1"=>"value1", "key2"=>"value2"'
 
1087
        )
 
1088
 
 
1089
    def test_result_deserialize_psycopg2(self):
 
1090
        from sqlalchemy.dialects.postgresql import psycopg2
 
1091
 
 
1092
        dialect = psycopg2.PGDialect_psycopg2()
 
1093
        dialect._has_native_hstore = True
 
1094
        proc = self.test_table.c.hash.type._cached_result_processor(
 
1095
                    dialect, None)
 
1096
        is_(proc, None)
 
1097
 
 
1098
        dialect = psycopg2.PGDialect_psycopg2()
 
1099
        dialect._has_native_hstore = False
 
1100
        proc = self.test_table.c.hash.type._cached_result_processor(
 
1101
                    dialect, None)
 
1102
        eq_(
 
1103
            proc('"key2"=>"value2", "key1"=>"value1"'),
 
1104
            {"key1": "value1", "key2": "value2"}
 
1105
        )
 
1106
 
 
1107
    def test_where_has_key(self):
 
1108
        self._test_where(
 
1109
            # hide from 2to3
 
1110
            getattr(self.hashcol, 'has_key')('foo'),
 
1111
            "test_table.hash ? %(hash_1)s"
 
1112
        )
 
1113
 
 
1114
    def test_where_has_all(self):
 
1115
        self._test_where(
 
1116
            self.hashcol.has_all(postgresql.array(['1', '2'])),
 
1117
            "test_table.hash ?& ARRAY[%(param_1)s, %(param_2)s]"
 
1118
        )
 
1119
 
 
1120
    def test_where_has_any(self):
 
1121
        self._test_where(
 
1122
            self.hashcol.has_any(postgresql.array(['1', '2'])),
 
1123
            "test_table.hash ?| ARRAY[%(param_1)s, %(param_2)s]"
 
1124
        )
 
1125
 
 
1126
    def test_where_defined(self):
 
1127
        self._test_where(
 
1128
            self.hashcol.defined('foo'),
 
1129
            "defined(test_table.hash, %(param_1)s)"
 
1130
        )
 
1131
 
 
1132
    def test_where_contains(self):
 
1133
        self._test_where(
 
1134
            self.hashcol.contains({'foo': '1'}),
 
1135
            "test_table.hash @> %(hash_1)s"
 
1136
        )
 
1137
 
 
1138
    def test_where_contained_by(self):
 
1139
        self._test_where(
 
1140
            self.hashcol.contained_by({'foo': '1', 'bar': None}),
 
1141
            "test_table.hash <@ %(hash_1)s"
 
1142
        )
 
1143
 
 
1144
    def test_where_getitem(self):
 
1145
        self._test_where(
 
1146
            self.hashcol['bar'] == None,
 
1147
            "(test_table.hash -> %(hash_1)s) IS NULL"
 
1148
        )
 
1149
 
 
1150
    def test_cols_get(self):
 
1151
        self._test_cols(
 
1152
            self.hashcol['foo'],
 
1153
            "test_table.hash -> %(hash_1)s AS anon_1",
 
1154
            True
 
1155
        )
 
1156
 
 
1157
    def test_cols_delete_single_key(self):
 
1158
        self._test_cols(
 
1159
            self.hashcol.delete('foo'),
 
1160
            "delete(test_table.hash, %(param_1)s) AS delete_1",
 
1161
            True
 
1162
        )
 
1163
 
 
1164
    def test_cols_delete_array_of_keys(self):
 
1165
        self._test_cols(
 
1166
            self.hashcol.delete(postgresql.array(['foo', 'bar'])),
 
1167
            ("delete(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
 
1168
             "AS delete_1"),
 
1169
            True
 
1170
        )
 
1171
 
 
1172
    def test_cols_delete_matching_pairs(self):
 
1173
        self._test_cols(
 
1174
            self.hashcol.delete(hstore('1', '2')),
 
1175
            ("delete(test_table.hash, hstore(%(param_1)s, %(param_2)s)) "
 
1176
             "AS delete_1"),
 
1177
            True
 
1178
        )
 
1179
 
 
1180
    def test_cols_slice(self):
 
1181
        self._test_cols(
 
1182
            self.hashcol.slice(postgresql.array(['1', '2'])),
 
1183
            ("slice(test_table.hash, ARRAY[%(param_1)s, %(param_2)s]) "
 
1184
             "AS slice_1"),
 
1185
            True
 
1186
        )
 
1187
 
 
1188
    def test_cols_hstore_pair_text(self):
 
1189
        self._test_cols(
 
1190
            hstore('foo', '3')['foo'],
 
1191
            "hstore(%(param_1)s, %(param_2)s) -> %(hstore_1)s AS anon_1",
 
1192
            False
 
1193
        )
 
1194
 
 
1195
    def test_cols_hstore_pair_array(self):
 
1196
        self._test_cols(
 
1197
            hstore(postgresql.array(['1', '2']),
 
1198
                   postgresql.array(['3', None]))['1'],
 
1199
            ("hstore(ARRAY[%(param_1)s, %(param_2)s], "
 
1200
             "ARRAY[%(param_3)s, NULL]) -> %(hstore_1)s AS anon_1"),
 
1201
            False
 
1202
        )
 
1203
 
 
1204
    def test_cols_hstore_single_array(self):
 
1205
        self._test_cols(
 
1206
            hstore(postgresql.array(['1', '2', '3', None]))['3'],
 
1207
            ("hstore(ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, NULL]) "
 
1208
             "-> %(hstore_1)s AS anon_1"),
 
1209
            False
 
1210
        )
 
1211
 
 
1212
    def test_cols_concat(self):
 
1213
        self._test_cols(
 
1214
            self.hashcol.concat(hstore(cast(self.test_table.c.id, Text), '3')),
 
1215
            ("test_table.hash || hstore(CAST(test_table.id AS TEXT), "
 
1216
             "%(param_1)s) AS anon_1"),
 
1217
            True
 
1218
        )
 
1219
 
 
1220
    def test_cols_concat_op(self):
 
1221
        self._test_cols(
 
1222
            hstore('foo', 'bar') + self.hashcol,
 
1223
            "hstore(%(param_1)s, %(param_2)s) || test_table.hash AS anon_1",
 
1224
            True
 
1225
        )
 
1226
 
 
1227
    def test_cols_concat_get(self):
 
1228
        self._test_cols(
 
1229
            (self.hashcol + self.hashcol)['foo'],
 
1230
            "test_table.hash || test_table.hash -> %(param_1)s AS anon_1"
 
1231
        )
 
1232
 
 
1233
    def test_cols_keys(self):
 
1234
        self._test_cols(
 
1235
            # hide from 2to3
 
1236
            getattr(self.hashcol, 'keys')(),
 
1237
            "akeys(test_table.hash) AS akeys_1",
 
1238
            True
 
1239
        )
 
1240
 
 
1241
    def test_cols_vals(self):
 
1242
        self._test_cols(
 
1243
            self.hashcol.vals(),
 
1244
            "avals(test_table.hash) AS avals_1",
 
1245
            True
 
1246
        )
 
1247
 
 
1248
    def test_cols_array(self):
 
1249
        self._test_cols(
 
1250
            self.hashcol.array(),
 
1251
            "hstore_to_array(test_table.hash) AS hstore_to_array_1",
 
1252
            True
 
1253
        )
 
1254
 
 
1255
    def test_cols_matrix(self):
 
1256
        self._test_cols(
 
1257
            self.hashcol.matrix(),
 
1258
            "hstore_to_matrix(test_table.hash) AS hstore_to_matrix_1",
 
1259
            True
 
1260
        )
 
1261
 
 
1262
 
 
1263
class HStoreRoundTripTest(fixtures.TablesTest):
 
1264
    __requires__ = 'hstore',
 
1265
    __dialect__ = 'postgresql'
 
1266
 
 
1267
    @classmethod
 
1268
    def define_tables(cls, metadata):
 
1269
        Table('data_table', metadata,
 
1270
            Column('id', Integer, primary_key=True),
 
1271
            Column('name', String(30), nullable=False),
 
1272
            Column('data', HSTORE)
 
1273
        )
 
1274
 
 
1275
    def _fixture_data(self, engine):
 
1276
        data_table = self.tables.data_table
 
1277
        engine.execute(
 
1278
                data_table.insert(),
 
1279
                {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}},
 
1280
                {'name': 'r2', 'data': {"k1": "r2v1", "k2": "r2v2"}},
 
1281
                {'name': 'r3', 'data': {"k1": "r3v1", "k2": "r3v2"}},
 
1282
                {'name': 'r4', 'data': {"k1": "r4v1", "k2": "r4v2"}},
 
1283
                {'name': 'r5', 'data': {"k1": "r5v1", "k2": "r5v2"}},
 
1284
        )
 
1285
 
 
1286
    def _assert_data(self, compare):
 
1287
        data = testing.db.execute(
 
1288
            select([self.tables.data_table.c.data]).
 
1289
                order_by(self.tables.data_table.c.name)
 
1290
        ).fetchall()
 
1291
        eq_([d for d, in data], compare)
 
1292
 
 
1293
    def _test_insert(self, engine):
 
1294
        engine.execute(
 
1295
            self.tables.data_table.insert(),
 
1296
            {'name': 'r1', 'data': {"k1": "r1v1", "k2": "r1v2"}}
 
1297
        )
 
1298
        self._assert_data([{"k1": "r1v1", "k2": "r1v2"}])
 
1299
 
 
1300
    def _non_native_engine(self):
 
1301
        if testing.against("postgresql+psycopg2"):
 
1302
            engine = engines.testing_engine(options=dict(use_native_hstore=False))
 
1303
        else:
 
1304
            engine = testing.db
 
1305
        engine.connect()
 
1306
        return engine
 
1307
 
 
1308
    def test_reflect(self):
 
1309
        from sqlalchemy import inspect
 
1310
        insp = inspect(testing.db)
 
1311
        cols = insp.get_columns('data_table')
 
1312
        assert isinstance(cols[2]['type'], HSTORE)
 
1313
 
 
1314
    @testing.only_on("postgresql+psycopg2")
 
1315
    def test_insert_native(self):
 
1316
        engine = testing.db
 
1317
        self._test_insert(engine)
 
1318
 
 
1319
    def test_insert_python(self):
 
1320
        engine = self._non_native_engine()
 
1321
        self._test_insert(engine)
 
1322
 
 
1323
    @testing.only_on("postgresql+psycopg2")
 
1324
    def test_criterion_native(self):
 
1325
        engine = testing.db
 
1326
        self._fixture_data(engine)
 
1327
        self._test_criterion(engine)
 
1328
 
 
1329
    def test_criterion_python(self):
 
1330
        engine = self._non_native_engine()
 
1331
        self._fixture_data(engine)
 
1332
        self._test_criterion(engine)
 
1333
 
 
1334
    def _test_criterion(self, engine):
 
1335
        data_table = self.tables.data_table
 
1336
        result = engine.execute(
 
1337
            select([data_table.c.data]).where(data_table.c.data['k1'] == 'r3v1')
 
1338
        ).first()
 
1339
        eq_(result, ({'k1': 'r3v1', 'k2': 'r3v2'},))
 
1340
 
 
1341
    def _test_fixed_round_trip(self, engine):
 
1342
        s = select([
 
1343
                hstore(
 
1344
                    array(['key1', 'key2', 'key3']),
 
1345
                    array(['value1', 'value2', 'value3'])
 
1346
                )
 
1347
            ])
 
1348
        eq_(
 
1349
            engine.scalar(s),
 
1350
            {"key1": "value1", "key2": "value2", "key3": "value3"}
 
1351
        )
 
1352
 
 
1353
    def test_fixed_round_trip_python(self):
 
1354
        engine = self._non_native_engine()
 
1355
        self._test_fixed_round_trip(engine)
 
1356
 
 
1357
    @testing.only_on("postgresql+psycopg2")
 
1358
    def test_fixed_round_trip_native(self):
 
1359
        engine = testing.db
 
1360
        self._test_fixed_round_trip(engine)
 
1361
 
 
1362
    def _test_unicode_round_trip(self, engine):
 
1363
        s = select([
 
1364
            hstore(
 
1365
                array([util.u('réveillé'), util.u('drôle'), util.u('S’il')]),
 
1366
                array([util.u('réveillé'), util.u('drôle'), util.u('S’il')])
 
1367
            )
 
1368
        ])
 
1369
        eq_(
 
1370
            engine.scalar(s),
 
1371
            {
 
1372
                util.u('réveillé'): util.u('réveillé'),
 
1373
                util.u('drôle'): util.u('drôle'),
 
1374
                util.u('S’il'): util.u('S’il')
 
1375
            }
 
1376
        )
 
1377
 
 
1378
    def test_unicode_round_trip_python(self):
 
1379
        engine = self._non_native_engine()
 
1380
        self._test_unicode_round_trip(engine)
 
1381
 
 
1382
    @testing.only_on("postgresql+psycopg2")
 
1383
    def test_unicode_round_trip_native(self):
 
1384
        engine = testing.db
 
1385
        self._test_unicode_round_trip(engine)
 
1386
 
 
1387
    def test_escaped_quotes_round_trip_python(self):
 
1388
        engine = self._non_native_engine()
 
1389
        self._test_escaped_quotes_round_trip(engine)
 
1390
 
 
1391
    @testing.only_on("postgresql+psycopg2")
 
1392
    def test_escaped_quotes_round_trip_native(self):
 
1393
        engine = testing.db
 
1394
        self._test_escaped_quotes_round_trip(engine)
 
1395
 
 
1396
    def _test_escaped_quotes_round_trip(self, engine):
 
1397
        engine.execute(
 
1398
            self.tables.data_table.insert(),
 
1399
            {'name': 'r1', 'data': {r'key \"foo\"': r'value \"bar"\ xyz'}}
 
1400
        )
 
1401
        self._assert_data([{r'key \"foo\"': r'value \"bar"\ xyz'}])
 
1402
 
 
1403
class _RangeTypeMixin(object):
 
1404
    __requires__ = 'range_types',
 
1405
    __dialect__ = 'postgresql+psycopg2'
 
1406
 
 
1407
    @property
 
1408
    def extras(self):
 
1409
        # done this way so we don't get ImportErrors with
 
1410
        # older psycopg2 versions.
 
1411
        from psycopg2 import extras
 
1412
        return extras
 
1413
 
 
1414
    @classmethod
 
1415
    def define_tables(cls, metadata):
 
1416
        # no reason ranges shouldn't be primary keys,
 
1417
        # so lets just use them as such
 
1418
        table = Table('data_table', metadata,
 
1419
            Column('range', cls._col_type, primary_key=True),
 
1420
        )
 
1421
        cls.col = table.c.range
 
1422
 
 
1423
    def test_actual_type(self):
 
1424
        eq_(str(self._col_type()), self._col_str)
 
1425
 
 
1426
    def test_reflect(self):
 
1427
        from sqlalchemy import inspect
 
1428
        insp = inspect(testing.db)
 
1429
        cols = insp.get_columns('data_table')
 
1430
        assert isinstance(cols[0]['type'], self._col_type)
 
1431
 
 
1432
    def _assert_data(self):
 
1433
        data = testing.db.execute(
 
1434
            select([self.tables.data_table.c.range])
 
1435
        ).fetchall()
 
1436
        eq_(data, [(self._data_obj(), )])
 
1437
 
 
1438
    def test_insert_obj(self):
 
1439
        testing.db.engine.execute(
 
1440
            self.tables.data_table.insert(),
 
1441
            {'range': self._data_obj()}
 
1442
        )
 
1443
        self._assert_data()
 
1444
 
 
1445
    def test_insert_text(self):
 
1446
        testing.db.engine.execute(
 
1447
            self.tables.data_table.insert(),
 
1448
            {'range': self._data_str}
 
1449
        )
 
1450
        self._assert_data()
 
1451
 
 
1452
    # operator tests
 
1453
 
 
1454
    def _test_clause(self, colclause, expected):
 
1455
        dialect = postgresql.dialect()
 
1456
        compiled = str(colclause.compile(dialect=dialect))
 
1457
        eq_(compiled, expected)
 
1458
 
 
1459
    def test_where_equal(self):
 
1460
        self._test_clause(
 
1461
            self.col==self._data_str,
 
1462
            "data_table.range = %(range_1)s"
 
1463
        )
 
1464
 
 
1465
    def test_where_not_equal(self):
 
1466
        self._test_clause(
 
1467
            self.col!=self._data_str,
 
1468
            "data_table.range <> %(range_1)s"
 
1469
        )
 
1470
 
 
1471
    def test_where_less_than(self):
 
1472
        self._test_clause(
 
1473
            self.col < self._data_str,
 
1474
            "data_table.range < %(range_1)s"
 
1475
        )
 
1476
 
 
1477
    def test_where_greater_than(self):
 
1478
        self._test_clause(
 
1479
            self.col > self._data_str,
 
1480
            "data_table.range > %(range_1)s"
 
1481
        )
 
1482
 
 
1483
    def test_where_less_than_or_equal(self):
 
1484
        self._test_clause(
 
1485
            self.col <= self._data_str,
 
1486
            "data_table.range <= %(range_1)s"
 
1487
        )
 
1488
 
 
1489
    def test_where_greater_than_or_equal(self):
 
1490
        self._test_clause(
 
1491
            self.col >= self._data_str,
 
1492
            "data_table.range >= %(range_1)s"
 
1493
        )
 
1494
 
 
1495
    def test_contains(self):
 
1496
        self._test_clause(
 
1497
            self.col.contains(self._data_str),
 
1498
            "data_table.range @> %(range_1)s"
 
1499
        )
 
1500
 
 
1501
    def test_contained_by(self):
 
1502
        self._test_clause(
 
1503
            self.col.contained_by(self._data_str),
 
1504
            "data_table.range <@ %(range_1)s"
 
1505
        )
 
1506
 
 
1507
    def test_overlaps(self):
 
1508
        self._test_clause(
 
1509
            self.col.overlaps(self._data_str),
 
1510
            "data_table.range && %(range_1)s"
 
1511
        )
 
1512
 
 
1513
    def test_strictly_left_of(self):
 
1514
        self._test_clause(
 
1515
            self.col << self._data_str,
 
1516
            "data_table.range << %(range_1)s"
 
1517
        )
 
1518
        self._test_clause(
 
1519
            self.col.strictly_left_of(self._data_str),
 
1520
            "data_table.range << %(range_1)s"
 
1521
        )
 
1522
 
 
1523
    def test_strictly_right_of(self):
 
1524
        self._test_clause(
 
1525
            self.col >> self._data_str,
 
1526
            "data_table.range >> %(range_1)s"
 
1527
        )
 
1528
        self._test_clause(
 
1529
            self.col.strictly_right_of(self._data_str),
 
1530
            "data_table.range >> %(range_1)s"
 
1531
        )
 
1532
 
 
1533
    def test_not_extend_right_of(self):
 
1534
        self._test_clause(
 
1535
            self.col.not_extend_right_of(self._data_str),
 
1536
            "data_table.range &< %(range_1)s"
 
1537
        )
 
1538
 
 
1539
    def test_not_extend_left_of(self):
 
1540
        self._test_clause(
 
1541
            self.col.not_extend_left_of(self._data_str),
 
1542
            "data_table.range &> %(range_1)s"
 
1543
        )
 
1544
 
 
1545
    def test_adjacent_to(self):
 
1546
        self._test_clause(
 
1547
            self.col.adjacent_to(self._data_str),
 
1548
            "data_table.range -|- %(range_1)s"
 
1549
        )
 
1550
 
 
1551
    def test_union(self):
 
1552
        self._test_clause(
 
1553
            self.col + self.col,
 
1554
            "data_table.range + data_table.range"
 
1555
        )
 
1556
 
 
1557
    def test_union_result(self):
 
1558
        # insert
 
1559
        testing.db.engine.execute(
 
1560
            self.tables.data_table.insert(),
 
1561
            {'range': self._data_str}
 
1562
        )
 
1563
        # select
 
1564
        range = self.tables.data_table.c.range
 
1565
        data = testing.db.execute(
 
1566
            select([range + range])
 
1567
            ).fetchall()
 
1568
        eq_(data, [(self._data_obj(), )])
 
1569
 
 
1570
 
 
1571
    def test_intersection(self):
 
1572
        self._test_clause(
 
1573
            self.col * self.col,
 
1574
            "data_table.range * data_table.range"
 
1575
        )
 
1576
 
 
1577
    def test_intersection_result(self):
 
1578
        # insert
 
1579
        testing.db.engine.execute(
 
1580
            self.tables.data_table.insert(),
 
1581
            {'range': self._data_str}
 
1582
        )
 
1583
        # select
 
1584
        range = self.tables.data_table.c.range
 
1585
        data = testing.db.execute(
 
1586
            select([range * range])
 
1587
            ).fetchall()
 
1588
        eq_(data, [(self._data_obj(), )])
 
1589
 
 
1590
    def test_different(self):
 
1591
        self._test_clause(
 
1592
            self.col - self.col,
 
1593
            "data_table.range - data_table.range"
 
1594
        )
 
1595
 
 
1596
    def test_difference_result(self):
 
1597
        # insert
 
1598
        testing.db.engine.execute(
 
1599
            self.tables.data_table.insert(),
 
1600
            {'range': self._data_str}
 
1601
        )
 
1602
        # select
 
1603
        range = self.tables.data_table.c.range
 
1604
        data = testing.db.execute(
 
1605
            select([range - range])
 
1606
            ).fetchall()
 
1607
        eq_(data, [(self._data_obj().__class__(empty=True), )])
 
1608
 
 
1609
class Int4RangeTests(_RangeTypeMixin, fixtures.TablesTest):
 
1610
 
 
1611
    _col_type = INT4RANGE
 
1612
    _col_str = 'INT4RANGE'
 
1613
    _data_str = '[1,2)'
 
1614
    def _data_obj(self):
 
1615
        return self.extras.NumericRange(1, 2)
 
1616
 
 
1617
class Int8RangeTests(_RangeTypeMixin, fixtures.TablesTest):
 
1618
 
 
1619
    _col_type = INT8RANGE
 
1620
    _col_str = 'INT8RANGE'
 
1621
    _data_str = '[9223372036854775806,9223372036854775807)'
 
1622
    def _data_obj(self):
 
1623
        return self.extras.NumericRange(
 
1624
            9223372036854775806, 9223372036854775807
 
1625
            )
 
1626
 
 
1627
class NumRangeTests(_RangeTypeMixin, fixtures.TablesTest):
 
1628
 
 
1629
    _col_type = NUMRANGE
 
1630
    _col_str = 'NUMRANGE'
 
1631
    _data_str = '[1.0,2.0)'
 
1632
    def _data_obj(self):
 
1633
        return self.extras.NumericRange(
 
1634
            decimal.Decimal('1.0'), decimal.Decimal('2.0')
 
1635
            )
 
1636
 
 
1637
class DateRangeTests(_RangeTypeMixin, fixtures.TablesTest):
 
1638
 
 
1639
    _col_type = DATERANGE
 
1640
    _col_str = 'DATERANGE'
 
1641
    _data_str = '[2013-03-23,2013-03-24)'
 
1642
    def _data_obj(self):
 
1643
        return self.extras.DateRange(
 
1644
            datetime.date(2013, 3, 23), datetime.date(2013, 3, 24)
 
1645
            )
 
1646
 
 
1647
class DateTimeRangeTests(_RangeTypeMixin, fixtures.TablesTest):
 
1648
 
 
1649
    _col_type = TSRANGE
 
1650
    _col_str = 'TSRANGE'
 
1651
    _data_str = '[2013-03-23 14:30,2013-03-23 23:30)'
 
1652
    def _data_obj(self):
 
1653
        return self.extras.DateTimeRange(
 
1654
            datetime.datetime(2013, 3, 23, 14, 30),
 
1655
            datetime.datetime(2013, 3, 23, 23, 30)
 
1656
            )
 
1657
 
 
1658
class DateTimeTZRangeTests(_RangeTypeMixin, fixtures.TablesTest):
 
1659
 
 
1660
    _col_type = TSTZRANGE
 
1661
    _col_str = 'TSTZRANGE'
 
1662
 
 
1663
    # make sure we use one, steady timestamp with timezone pair
 
1664
    # for all parts of all these tests
 
1665
    _tstzs = None
 
1666
    def tstzs(self):
 
1667
        if self._tstzs is None:
 
1668
            lower = testing.db.connect().scalar(
 
1669
                func.current_timestamp().select()
 
1670
                )
 
1671
            upper = lower+datetime.timedelta(1)
 
1672
            self._tstzs = (lower, upper)
 
1673
        return self._tstzs
 
1674
 
 
1675
    @property
 
1676
    def _data_str(self):
 
1677
        return '[%s,%s)' % self.tstzs()
 
1678
 
 
1679
    def _data_obj(self):
 
1680
        return self.extras.DateTimeTZRange(*self.tstzs())