~james-w/ubuntu/lucid/psycopg2/precise-backport

« back to all changes in this revision

Viewing changes to tests/testutils.py

  • Committer: Bazaar Package Importer
  • Author(s): Fabio Tranchitella, Jakub Wilk, Fabio Tranchitella
  • Date: 2011-06-19 18:25:53 UTC
  • mfrom: (5.1.10 sid)
  • Revision ID: james.westby@ubuntu.com-20110619182553-uye7z0g5ewab98px
Tags: 2.4.2-1
[ Jakub Wilk ]
* Add Debian Python Modules Team to Uploaders.

[ Fabio Tranchitella ]
* New upstream release.
* debian/watch: updated, use pypi.
* debian/control, debian/rules: switched to dh_python2.
* debian/control: bumped Standard-Version to 3.9.2, no changes required.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# testutils.py - utility module for psycopg2 testing.
 
2
 
 
3
#
 
4
# Copyright (C) 2010-2011 Daniele Varrazzo  <daniele.varrazzo@gmail.com>
 
5
#
 
6
# psycopg2 is free software: you can redistribute it and/or modify it
 
7
# under the terms of the GNU Lesser General Public License as published
 
8
# by the Free Software Foundation, either version 3 of the License, or
 
9
# (at your option) any later version.
 
10
#
 
11
# In addition, as a special exception, the copyright holders give
 
12
# permission to link this program with the OpenSSL library (or with
 
13
# modified versions of OpenSSL that use the same license as OpenSSL),
 
14
# and distribute linked combinations including the two.
 
15
#
 
16
# You must obey the GNU Lesser General Public License in all respects for
 
17
# all of the code used other than OpenSSL.
 
18
#
 
19
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
 
20
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 
21
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
 
22
# License for more details.
 
23
 
 
24
 
 
25
# Use unittest2 if available. Otherwise mock a skip facility with warnings.
 
26
 
 
27
import os
 
28
import sys
 
29
 
 
30
try:
 
31
    import unittest2
 
32
    unittest = unittest2
 
33
except ImportError:
 
34
    import unittest
 
35
    unittest2 = None
 
36
 
 
37
if hasattr(unittest, 'skipIf'):
 
38
    skip = unittest.skip
 
39
    skipIf = unittest.skipIf
 
40
 
 
41
else:
 
42
    import warnings
 
43
 
 
44
    def skipIf(cond, msg):
 
45
        def skipIf_(f):
 
46
            def skipIf__(self):
 
47
                if cond:
 
48
                    warnings.warn(msg)
 
49
                    return
 
50
                else:
 
51
                    return f(self)
 
52
            return skipIf__
 
53
        return skipIf_
 
54
 
 
55
    def skip(msg):
 
56
        return skipIf(True, msg)
 
57
 
 
58
    def skipTest(self, msg):
 
59
        warnings.warn(msg)
 
60
        return
 
61
 
 
62
    unittest.TestCase.skipTest = skipTest
 
63
 
 
64
# Silence warnings caused by the stubborness of the Python unittest maintainers
 
65
# http://bugs.python.org/issue9424
 
66
if not hasattr(unittest.TestCase, 'assert_') \
 
67
or unittest.TestCase.assert_ is not unittest.TestCase.assertTrue:
 
68
    # mavaff...
 
69
    unittest.TestCase.assert_ = unittest.TestCase.assertTrue
 
70
    unittest.TestCase.failUnless = unittest.TestCase.assertTrue
 
71
    unittest.TestCase.assertEquals = unittest.TestCase.assertEqual
 
72
    unittest.TestCase.failUnlessEqual = unittest.TestCase.assertEqual
 
73
 
 
74
 
 
75
def decorate_all_tests(cls, decorator):
 
76
    """Apply *decorator* to all the tests defined in the TestCase *cls*."""
 
77
    for n in dir(cls):
 
78
        if n.startswith('test'):
 
79
            setattr(cls, n, decorator(getattr(cls, n)))
 
80
 
 
81
 
 
82
def skip_if_no_uuid(f):
 
83
    """Decorator to skip a test if uuid is not supported by Py/PG."""
 
84
    def skip_if_no_uuid_(self):
 
85
        try:
 
86
            import uuid
 
87
        except ImportError:
 
88
            return self.skipTest("uuid not available in this Python version")
 
89
 
 
90
        try:
 
91
            cur = self.conn.cursor()
 
92
            cur.execute("select typname from pg_type where typname = 'uuid'")
 
93
            has = cur.fetchone()
 
94
        finally:
 
95
            self.conn.rollback()
 
96
 
 
97
        if has:
 
98
            return f(self)
 
99
        else:
 
100
            return self.skipTest("uuid type not available on the server")
 
101
 
 
102
    return skip_if_no_uuid_
 
103
 
 
104
 
 
105
def skip_if_tpc_disabled(f):
 
106
    """Skip a test if the server has tpc support disabled."""
 
107
    def skip_if_tpc_disabled_(self):
 
108
        from psycopg2 import ProgrammingError
 
109
        cnn = self.connect()
 
110
        cur = cnn.cursor()
 
111
        try:
 
112
            cur.execute("SHOW max_prepared_transactions;")
 
113
        except ProgrammingError:
 
114
            return self.skipTest(
 
115
                "server too old: two phase transactions not supported.")
 
116
        else:
 
117
            mtp = int(cur.fetchone()[0])
 
118
        cnn.close()
 
119
 
 
120
        if not mtp:
 
121
            return self.skipTest(
 
122
                "server not configured for two phase transactions. "
 
123
                "set max_prepared_transactions to > 0 to run the test")
 
124
        return f(self)
 
125
 
 
126
    skip_if_tpc_disabled_.__name__ = f.__name__
 
127
    return skip_if_tpc_disabled_
 
128
 
 
129
 
 
130
def skip_if_no_namedtuple(f):
 
131
    def skip_if_no_namedtuple_(self):
 
132
        try:
 
133
            from collections import namedtuple
 
134
        except ImportError:
 
135
            return self.skipTest("collections.namedtuple not available")
 
136
        else:
 
137
            return f(self)
 
138
 
 
139
    skip_if_no_namedtuple_.__name__ = f.__name__
 
140
    return skip_if_no_namedtuple_
 
141
 
 
142
 
 
143
def skip_if_no_iobase(f):
 
144
    """Skip a test if io.TextIOBase is not available."""
 
145
    def skip_if_no_iobase_(self):
 
146
        try:
 
147
            from io import TextIOBase
 
148
        except ImportError:
 
149
            return self.skipTest("io.TextIOBase not found.")
 
150
        else:
 
151
            return f(self)
 
152
 
 
153
    return skip_if_no_iobase_
 
154
 
 
155
 
 
156
def skip_before_postgres(*ver):
 
157
    """Skip a test on PostgreSQL before a certain version."""
 
158
    ver = ver + (0,) * (3 - len(ver))
 
159
    def skip_before_postgres_(f):
 
160
        def skip_before_postgres__(self):
 
161
            if self.conn.server_version < int("%d%02d%02d" % ver):
 
162
                return self.skipTest("skipped because PostgreSQL %s"
 
163
                    % self.conn.server_version)
 
164
            else:
 
165
                return f(self)
 
166
 
 
167
        return skip_before_postgres__
 
168
    return skip_before_postgres_
 
169
 
 
170
def skip_after_postgres(*ver):
 
171
    """Skip a test on PostgreSQL after (including) a certain version."""
 
172
    ver = ver + (0,) * (3 - len(ver))
 
173
    def skip_after_postgres_(f):
 
174
        def skip_after_postgres__(self):
 
175
            if self.conn.server_version >= int("%d%02d%02d" % ver):
 
176
                return self.skipTest("skipped because PostgreSQL %s"
 
177
                    % self.conn.server_version)
 
178
            else:
 
179
                return f(self)
 
180
 
 
181
        return skip_after_postgres__
 
182
    return skip_after_postgres_
 
183
 
 
184
def skip_before_python(*ver):
 
185
    """Skip a test on Python before a certain version."""
 
186
    def skip_before_python_(f):
 
187
        def skip_before_python__(self):
 
188
            if sys.version_info[:len(ver)] < ver:
 
189
                return self.skipTest("skipped because Python %s"
 
190
                    % ".".join(map(str, sys.version_info[:len(ver)])))
 
191
            else:
 
192
                return f(self)
 
193
 
 
194
        return skip_before_python__
 
195
    return skip_before_python_
 
196
 
 
197
def skip_from_python(*ver):
 
198
    """Skip a test on Python after (including) a certain version."""
 
199
    def skip_from_python_(f):
 
200
        def skip_from_python__(self):
 
201
            if sys.version_info[:len(ver)] >= ver:
 
202
                return self.skipTest("skipped because Python %s"
 
203
                    % ".".join(map(str, sys.version_info[:len(ver)])))
 
204
            else:
 
205
                return f(self)
 
206
 
 
207
        return skip_from_python__
 
208
    return skip_from_python_
 
209
 
 
210
 
 
211
def script_to_py3(script):
 
212
    """Convert a script to Python3 syntax if required."""
 
213
    if sys.version_info[0] < 3:
 
214
        return script
 
215
 
 
216
    import tempfile
 
217
    f = tempfile.NamedTemporaryFile(suffix=".py", delete=False)
 
218
    f.write(script.encode())
 
219
    f.flush()
 
220
    filename = f.name
 
221
    f.close()
 
222
 
 
223
    # 2to3 is way too chatty
 
224
    import logging
 
225
    logging.basicConfig(filename=os.devnull)
 
226
 
 
227
    from lib2to3.main import main
 
228
    if main("lib2to3.fixes", ['--no-diffs', '-w', '-n', filename]):
 
229
        raise Exception('py3 conversion failed')
 
230
 
 
231
    f2 = open(filename)
 
232
    try:
 
233
        return f2.read()
 
234
    finally:
 
235
        f2.close()
 
236
        os.remove(filename)
 
237