3
# test_cursor.py - unit test for cursor attributes
5
# Copyright (C) 2010-2011 Daniele Varrazzo <daniele.varrazzo@gmail.com>
7
# psycopg2 is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU Lesser General Public License as published
9
# by the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
12
# In addition, as a special exception, the copyright holders give
13
# permission to link this program with the OpenSSL library (or with
14
# modified versions of OpenSSL that use the same license as OpenSSL),
15
# and distribute linked combinations including the two.
17
# You must obey the GNU Lesser General Public License in all respects for
18
# all of the code used other than OpenSSL.
20
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
21
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
22
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
23
# License for more details.
27
import psycopg2.extensions
28
from psycopg2.extensions import b
29
from testconfig import dsn
30
from testutils import unittest, skip_before_postgres, skip_if_no_namedtuple
32
class CursorTests(unittest.TestCase):
35
self.conn = psycopg2.connect(dsn)
40
def test_empty_query(self):
41
cur = self.conn.cursor()
42
self.assertRaises(psycopg2.ProgrammingError, cur.execute, "")
43
self.assertRaises(psycopg2.ProgrammingError, cur.execute, " ")
44
self.assertRaises(psycopg2.ProgrammingError, cur.execute, ";")
46
def test_executemany_propagate_exceptions(self):
49
cur.execute("create temp table test_exc (data int);")
52
self.assertRaises(ZeroDivisionError,
53
cur.executemany, "insert into test_exc values (%s)", buggygen())
56
def test_mogrify_unicode(self):
60
# test consistency between execute and mogrify.
62
# unicode query containing only ascii data
63
cur.execute(u"SELECT 'foo';")
64
self.assertEqual('foo', cur.fetchone()[0])
65
self.assertEqual(b("SELECT 'foo';"), cur.mogrify(u"SELECT 'foo';"))
67
conn.set_client_encoding('UTF8')
70
# unicode query with non-ascii data
71
cur.execute(u"SELECT '%s';" % snowman)
72
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0]))
73
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
74
cur.mogrify(u"SELECT '%s';" % snowman).replace(b("E'"), b("'")))
77
cur.execute("SELECT %s;", (snowman,))
78
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
79
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
80
cur.mogrify("SELECT %s;", (snowman,)).replace(b("E'"), b("'")))
82
# unicode query and args
83
cur.execute(u"SELECT %s;", (snowman,))
84
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
85
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
86
cur.mogrify(u"SELECT %s;", (snowman,)).replace(b("E'"), b("'")))
88
def test_mogrify_decimal_explodes(self):
89
# issue #7: explodes on windows with python 2.5 and psycopg 2.2.2
91
from decimal import Decimal
97
self.assertEqual(b('SELECT 10.3;'),
98
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
100
def test_bad_placeholder(self):
101
cur = self.conn.cursor()
102
self.assertRaises(psycopg2.ProgrammingError,
103
cur.mogrify, "select %(foo", {})
104
self.assertRaises(psycopg2.ProgrammingError,
105
cur.mogrify, "select %(foo", {'foo': 1})
106
self.assertRaises(psycopg2.ProgrammingError,
107
cur.mogrify, "select %(foo, %(bar)", {'foo': 1})
108
self.assertRaises(psycopg2.ProgrammingError,
109
cur.mogrify, "select %(foo, %(bar)", {'foo': 1, 'bar': 2})
112
curs = self.conn.cursor()
114
self.assertEqual(42, curs.cast(20, '42'))
115
self.assertAlmostEqual(3.14, curs.cast(700, '3.14'))
118
from decimal import Decimal
120
self.assertAlmostEqual(123.45, curs.cast(1700, '123.45'))
122
self.assertEqual(Decimal('123.45'), curs.cast(1700, '123.45'))
124
from datetime import date
125
self.assertEqual(date(2011,1,2), curs.cast(1082, '2011-01-02'))
126
self.assertEqual("who am i?", curs.cast(705, 'who am i?')) # unknown
128
def test_cast_specificity(self):
129
curs = self.conn.cursor()
130
self.assertEqual("foo", curs.cast(705, 'foo'))
132
D = psycopg2.extensions.new_type((705,), "DOUBLING", lambda v, c: v * 2)
133
psycopg2.extensions.register_type(D, self.conn)
134
self.assertEqual("foofoo", curs.cast(705, 'foo'))
136
T = psycopg2.extensions.new_type((705,), "TREBLING", lambda v, c: v * 3)
137
psycopg2.extensions.register_type(T, curs)
138
self.assertEqual("foofoofoo", curs.cast(705, 'foo'))
140
curs2 = self.conn.cursor()
141
self.assertEqual("foofoo", curs2.cast(705, 'foo'))
143
def test_weakref(self):
144
from weakref import ref
145
curs = self.conn.cursor()
148
self.assert_(w() is None)
150
def test_invalid_name(self):
151
curs = self.conn.cursor()
152
curs.execute("create temp table invname (data int);")
154
curs.execute("insert into invname values (%s)", (i,))
157
curs = self.conn.cursor(r'1-2-3 \ "test"')
158
curs.execute("select data from invname order by data")
159
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
161
@skip_before_postgres(8, 2)
162
def test_iter_named_cursor_efficient(self):
163
curs = self.conn.cursor('tmp')
164
# if these records are fetched in the same roundtrip their
165
# timestamp will not be influenced by the pause in Python world.
166
curs.execute("""select clock_timestamp() from generate_series(1,2)""")
168
t1 = (i.next())[0] # the brackets work around a 2to3 bug
171
self.assert_((t2 - t1).microseconds * 1e-6 < 0.1,
172
"named cursor records fetched in 2 roundtrips (delta: %s)"
175
@skip_before_postgres(8, 0)
176
def test_iter_named_cursor_default_itersize(self):
177
curs = self.conn.cursor('tmp')
178
curs.execute('select generate_series(1,50)')
179
rv = [ (r[0], curs.rownumber) for r in curs ]
180
# everything swallowed in one gulp
181
self.assertEqual(rv, [(i,i) for i in range(1,51)])
183
@skip_before_postgres(8, 0)
184
def test_iter_named_cursor_itersize(self):
185
curs = self.conn.cursor('tmp')
187
curs.execute('select generate_series(1,50)')
188
rv = [ (r[0], curs.rownumber) for r in curs ]
189
# everything swallowed in two gulps
190
self.assertEqual(rv, [(i,((i - 1) % 30) + 1) for i in range(1,51)])
192
@skip_if_no_namedtuple
193
def test_namedtuple_description(self):
194
curs = self.conn.cursor()
195
curs.execute("""select
196
3.14::decimal(10,2) as pi,
198
'2010-02-18'::date as now;
200
self.assertEqual(len(curs.description), 3)
201
for c in curs.description:
202
self.assertEqual(len(c), 7) # DBAPI happy
203
for a in ('name', 'type_code', 'display_size', 'internal_size',
204
'precision', 'scale', 'null_ok'):
205
self.assert_(hasattr(c, a), a)
207
c = curs.description[0]
208
self.assertEqual(c.name, 'pi')
209
self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
210
self.assert_(c.internal_size > 0)
211
self.assertEqual(c.precision, 10)
212
self.assertEqual(c.scale, 2)
214
c = curs.description[1]
215
self.assertEqual(c.name, 'hi')
216
self.assert_(c.type_code in psycopg2.STRING.values)
217
self.assert_(c.internal_size < 0)
218
self.assertEqual(c.precision, None)
219
self.assertEqual(c.scale, None)
221
c = curs.description[2]
222
self.assertEqual(c.name, 'now')
223
self.assert_(c.type_code in psycopg2.extensions.DATE.values)
224
self.assert_(c.internal_size > 0)
225
self.assertEqual(c.precision, None)
226
self.assertEqual(c.scale, None)
230
return unittest.TestLoader().loadTestsFromName(__name__)
232
if __name__ == "__main__":