~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/dialect/postgresql/test_dialect.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# coding: utf-8
 
2
 
 
3
from sqlalchemy.testing.assertions import eq_, assert_raises, \
 
4
                assert_raises_message, AssertsExecutionResults, \
 
5
                AssertsCompiledSQL
 
6
from sqlalchemy.testing import engines, fixtures
 
7
from sqlalchemy import testing
 
8
import datetime
 
9
from sqlalchemy import Table, Column, select, MetaData, text, Integer, \
 
10
            String, Sequence, ForeignKey, join, Numeric, \
 
11
            PrimaryKeyConstraint, DateTime, tuple_, Float, BigInteger, \
 
12
            func, literal_column, literal, bindparam, cast, extract, \
 
13
            SmallInteger, Enum, REAL, update, insert, Index, delete, \
 
14
            and_, Date, TypeDecorator, Time, Unicode, Interval, or_, Text
 
15
from sqlalchemy import exc, schema
 
16
from sqlalchemy.dialects.postgresql import base as postgresql
 
17
import logging
 
18
import logging.handlers
 
19
from sqlalchemy.testing.mock import Mock
 
20
 
 
21
class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
 
22
 
 
23
    __only_on__ = 'postgresql'
 
24
 
 
25
    @testing.provide_metadata
 
26
    def test_date_reflection(self):
 
27
        metadata = self.metadata
 
28
        t1 = Table('pgdate', metadata, Column('date1',
 
29
                   DateTime(timezone=True)), Column('date2',
 
30
                   DateTime(timezone=False)))
 
31
        metadata.create_all()
 
32
        m2 = MetaData(testing.db)
 
33
        t2 = Table('pgdate', m2, autoload=True)
 
34
        assert t2.c.date1.type.timezone is True
 
35
        assert t2.c.date2.type.timezone is False
 
36
 
 
37
    @testing.fails_on('+zxjdbc',
 
38
                      'The JDBC driver handles the version parsing')
 
39
    def test_version_parsing(self):
 
40
 
 
41
        def mock_conn(res):
 
42
            return Mock(
 
43
                    execute=Mock(
 
44
                            return_value=Mock(scalar=Mock(return_value=res))
 
45
                        )
 
46
                    )
 
47
 
 
48
        for string, version in \
 
49
            [('PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by '
 
50
             'GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)', (8, 3,
 
51
             8)),
 
52
             ('PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, '
 
53
             'compiled by GCC gcc (GCC) 4.4.2, 64-bit', (8, 5)),
 
54
             ('EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, '
 
55
             'compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), '
 
56
             '64-bit', (9, 1, 2))]:
 
57
            eq_(testing.db.dialect._get_server_version_info(mock_conn(string)),
 
58
                version)
 
59
 
 
60
    @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
 
61
    def test_psycopg2_version(self):
 
62
        v = testing.db.dialect.psycopg2_version
 
63
        assert testing.db.dialect.dbapi.__version__.\
 
64
                    startswith(".".join(str(x) for x in v))
 
65
 
 
66
    @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
 
67
    def test_notice_logging(self):
 
68
        log = logging.getLogger('sqlalchemy.dialects.postgresql')
 
69
        buf = logging.handlers.BufferingHandler(100)
 
70
        lev = log.level
 
71
        log.addHandler(buf)
 
72
        log.setLevel(logging.INFO)
 
73
        try:
 
74
            conn = testing.db.connect()
 
75
            trans = conn.begin()
 
76
            try:
 
77
                conn.execute('create table foo (id serial primary key)')
 
78
            finally:
 
79
                trans.rollback()
 
80
        finally:
 
81
            log.removeHandler(buf)
 
82
            log.setLevel(lev)
 
83
        msgs = ' '.join(b.msg for b in buf.buffer)
 
84
        assert 'will create implicit sequence' in msgs
 
85
        assert 'will create implicit index' in msgs
 
86
 
 
87
    @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
 
88
    @engines.close_open_connections
 
89
    def test_client_encoding(self):
 
90
        c = testing.db.connect()
 
91
        current_encoding = c.connection.connection.encoding
 
92
        c.close()
 
93
 
 
94
        # attempt to use an encoding that's not
 
95
        # already set
 
96
        if current_encoding == 'UTF8':
 
97
            test_encoding = 'LATIN1'
 
98
        else:
 
99
            test_encoding = 'UTF8'
 
100
 
 
101
        e = engines.testing_engine(
 
102
                        options={'client_encoding':test_encoding}
 
103
                    )
 
104
        c = e.connect()
 
105
        eq_(c.connection.connection.encoding, test_encoding)
 
106
 
 
107
    @testing.only_on('postgresql+psycopg2', 'psycopg2-specific feature')
 
108
    @engines.close_open_connections
 
109
    def test_autocommit_isolation_level(self):
 
110
        extensions = __import__('psycopg2.extensions').extensions
 
111
 
 
112
        c = testing.db.connect()
 
113
        c = c.execution_options(isolation_level='AUTOCOMMIT')
 
114
        eq_(c.connection.connection.isolation_level,
 
115
            extensions.ISOLATION_LEVEL_AUTOCOMMIT)
 
116
 
 
117
    @testing.fails_on('+zxjdbc',
 
118
                      "Can't infer the SQL type to use for an instance "
 
119
                      "of org.python.core.PyObjectDerived.")
 
120
    @testing.fails_on('+pg8000', "Can't determine correct type.")
 
121
    def test_extract(self):
 
122
        fivedaysago = datetime.datetime.now() \
 
123
            - datetime.timedelta(days=5)
 
124
        for field, exp in ('year', fivedaysago.year), ('month',
 
125
                fivedaysago.month), ('day', fivedaysago.day):
 
126
            r = testing.db.execute(select([extract(field, func.now()
 
127
                                   + datetime.timedelta(days=-5))])).scalar()
 
128
            eq_(r, exp)
 
129
 
 
130
    def test_checksfor_sequence(self):
 
131
        meta1 = MetaData(testing.db)
 
132
        seq = Sequence('fooseq')
 
133
        t = Table('mytable', meta1, Column('col1', Integer,
 
134
                  seq))
 
135
        seq.drop()
 
136
        try:
 
137
            testing.db.execute('CREATE SEQUENCE fooseq')
 
138
            t.create(checkfirst=True)
 
139
        finally:
 
140
            t.drop(checkfirst=True)
 
141
 
 
142
    def test_schema_roundtrips(self):
 
143
        meta = MetaData(testing.db)
 
144
        users = Table('users', meta, Column('id', Integer,
 
145
                      primary_key=True), Column('name', String(50)),
 
146
                      schema='test_schema')
 
147
        users.create()
 
148
        try:
 
149
            users.insert().execute(id=1, name='name1')
 
150
            users.insert().execute(id=2, name='name2')
 
151
            users.insert().execute(id=3, name='name3')
 
152
            users.insert().execute(id=4, name='name4')
 
153
            eq_(users.select().where(users.c.name == 'name2'
 
154
                ).execute().fetchall(), [(2, 'name2')])
 
155
            eq_(users.select(use_labels=True).where(users.c.name
 
156
                == 'name2').execute().fetchall(), [(2, 'name2')])
 
157
            users.delete().where(users.c.id == 3).execute()
 
158
            eq_(users.select().where(users.c.name == 'name3'
 
159
                ).execute().fetchall(), [])
 
160
            users.update().where(users.c.name == 'name4'
 
161
                                 ).execute(name='newname')
 
162
            eq_(users.select(use_labels=True).where(users.c.id
 
163
                == 4).execute().fetchall(), [(4, 'newname')])
 
164
        finally:
 
165
            users.drop()
 
166
 
 
167
    def test_preexecute_passivedefault(self):
 
168
        """test that when we get a primary key column back from
 
169
        reflecting a table which has a default value on it, we pre-
 
170
        execute that DefaultClause upon insert."""
 
171
 
 
172
        try:
 
173
            meta = MetaData(testing.db)
 
174
            testing.db.execute("""
 
175
             CREATE TABLE speedy_users
 
176
             (
 
177
                 speedy_user_id   SERIAL     PRIMARY KEY,
 
178
 
 
179
                 user_name        VARCHAR    NOT NULL,
 
180
                 user_password    VARCHAR    NOT NULL
 
181
             );
 
182
            """)
 
183
            t = Table('speedy_users', meta, autoload=True)
 
184
            r = t.insert().execute(user_name='user',
 
185
                                   user_password='lala')
 
186
            assert r.inserted_primary_key == [1]
 
187
            l = t.select().execute().fetchall()
 
188
            assert l == [(1, 'user', 'lala')]
 
189
        finally:
 
190
            testing.db.execute('drop table speedy_users')
 
191
 
 
192
 
 
193
    @testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion')
 
194
    @testing.fails_on('pypostgresql',
 
195
                      'psycopg2/pg8000 specific assertion')
 
196
    def test_numeric_raise(self):
 
197
        stmt = text("select cast('hi' as char) as hi", typemap={'hi'
 
198
                    : Numeric})
 
199
        assert_raises(exc.InvalidRequestError, testing.db.execute, stmt)
 
200
 
 
201
    def test_serial_integer(self):
 
202
        for type_, expected in [
 
203
            (Integer, 'SERIAL'),
 
204
            (BigInteger, 'BIGSERIAL'),
 
205
            (SmallInteger, 'SMALLINT'),
 
206
            (postgresql.INTEGER, 'SERIAL'),
 
207
            (postgresql.BIGINT, 'BIGSERIAL'),
 
208
        ]:
 
209
            m = MetaData()
 
210
 
 
211
            t = Table('t', m, Column('c', type_, primary_key=True))
 
212
            ddl_compiler = testing.db.dialect.ddl_compiler(testing.db.dialect, schema.CreateTable(t))
 
213
            eq_(
 
214
                ddl_compiler.get_column_specification(t.c.c),
 
215
                "c %s NOT NULL" % expected
 
216
            )