~ubuntu-branches/debian/sid/python-django/sid

« back to all changes in this revision

Viewing changes to tests/backends/tests.py

  • Committer: Package Import Robot
  • Author(s): Raphaël Hertzog
  • Date: 2014-09-17 14:15:11 UTC
  • mfrom: (1.3.17) (6.2.18 experimental)
  • Revision ID: package-import@ubuntu.com-20140917141511-icneokthe9ww5sk4
Tags: 1.7-2
* Release to unstable.
* Add a migrate-south sample script to help users apply their South
  migrations. Thanks to Brian May.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# -*- coding: utf-8 -*-
2
2
# Unit and doctests for specific database backends.
3
 
from __future__ import absolute_import, unicode_literals
 
3
from __future__ import unicode_literals
4
4
 
5
5
import copy
6
6
import datetime
7
7
from decimal import Decimal
 
8
import re
8
9
import threading
 
10
import unittest
9
11
 
10
12
from django.conf import settings
 
13
from django.core.exceptions import ImproperlyConfigured
11
14
from django.core.management.color import no_style
12
15
from django.db import (connection, connections, DEFAULT_DB_ALIAS,
13
16
    DatabaseError, IntegrityError, transaction)
14
17
from django.db.backends.signals import connection_created
15
18
from django.db.backends.sqlite3.base import DatabaseOperations
16
19
from django.db.backends.postgresql_psycopg2 import version as pg_version
17
 
from django.db.backends.util import format_number
 
20
from django.db.backends.utils import format_number, CursorWrapper
18
21
from django.db.models import Sum, Avg, Variance, StdDev
19
22
from django.db.models.fields import (AutoField, DateField, DateTimeField,
20
23
    DecimalField, IntegerField, TimeField)
 
24
from django.db.models.sql.constants import CURSOR
21
25
from django.db.utils import ConnectionHandler
22
 
from django.test import (TestCase, skipUnlessDBFeature, skipIfDBFeature,
23
 
    TransactionTestCase)
24
 
from django.test.utils import override_settings, str_prefix
25
 
from django.utils import six, unittest
 
26
from django.test import (TestCase, TransactionTestCase, override_settings,
 
27
    skipUnlessDBFeature, skipIfDBFeature)
 
28
from django.test.utils import str_prefix, IgnoreAllDeprecationWarningsMixin
 
29
from django.utils import six
26
30
from django.utils.six.moves import xrange
27
31
 
28
32
from . import models
29
33
 
30
34
 
31
35
class DummyBackendTest(TestCase):
 
36
 
32
37
    def test_no_databases(self):
33
38
        """
34
39
        Test that empty DATABASES setting default to the dummy backend.
39
44
            'django.db.backends.dummy')
40
45
 
41
46
 
42
 
class OracleChecks(unittest.TestCase):
 
47
@unittest.skipUnless(connection.vendor == 'oracle', "Test only for Oracle")
 
48
class OracleTests(unittest.TestCase):
43
49
 
44
 
    @unittest.skipUnless(connection.vendor == 'oracle',
45
 
                         "No need to check Oracle quote_name semantics")
46
50
    def test_quote_name(self):
47
51
        # Check that '%' chars are escaped for query execution.
48
52
        name = '"SOME%NAME"'
49
53
        quoted_name = connection.ops.quote_name(name)
50
54
        self.assertEqual(quoted_name % (), name)
51
55
 
52
 
    @unittest.skipUnless(connection.vendor == 'oracle',
53
 
                         "No need to check Oracle cursor semantics")
54
56
    def test_dbms_session(self):
55
57
        # If the backend is Oracle, test that we can call a standard
56
58
        # stored procedure through our cursor wrapper.
57
59
        from django.db.backends.oracle.base import convert_unicode
58
60
 
59
 
        cursor = connection.cursor()
60
 
        cursor.callproc(convert_unicode('DBMS_SESSION.SET_IDENTIFIER'),
61
 
                        [convert_unicode('_django_testing!')])
 
61
        with connection.cursor() as cursor:
 
62
            cursor.callproc(convert_unicode('DBMS_SESSION.SET_IDENTIFIER'),
 
63
                            [convert_unicode('_django_testing!')])
62
64
 
63
 
    @unittest.skipUnless(connection.vendor == 'oracle',
64
 
                         "No need to check Oracle cursor semantics")
65
65
    def test_cursor_var(self):
66
66
        # If the backend is Oracle, test that we can pass cursor variables
67
67
        # as query parameters.
68
68
        from django.db.backends.oracle.base import Database
69
69
 
70
 
        cursor = connection.cursor()
71
 
        var = cursor.var(Database.STRING)
72
 
        cursor.execute("BEGIN %s := 'X'; END; ", [var])
73
 
        self.assertEqual(var.getvalue(), 'X')
 
70
        with connection.cursor() as cursor:
 
71
            var = cursor.var(Database.STRING)
 
72
            cursor.execute("BEGIN %s := 'X'; END; ", [var])
 
73
            self.assertEqual(var.getvalue(), 'X')
74
74
 
75
 
    @unittest.skipUnless(connection.vendor == 'oracle',
76
 
                         "No need to check Oracle cursor semantics")
77
75
    def test_long_string(self):
78
76
        # If the backend is Oracle, test that we can save a text longer
79
77
        # than 4000 chars and read it properly
80
 
        c = connection.cursor()
81
 
        c.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
82
 
        long_str = ''.join([six.text_type(x) for x in xrange(4000)])
83
 
        c.execute('INSERT INTO ltext VALUES (%s)', [long_str])
84
 
        c.execute('SELECT text FROM ltext')
85
 
        row = c.fetchone()
86
 
        self.assertEqual(long_str, row[0].read())
87
 
        c.execute('DROP TABLE ltext')
 
78
        with connection.cursor() as cursor:
 
79
            cursor.execute('CREATE TABLE ltext ("TEXT" NCLOB)')
 
80
            long_str = ''.join(six.text_type(x) for x in xrange(4000))
 
81
            cursor.execute('INSERT INTO ltext VALUES (%s)', [long_str])
 
82
            cursor.execute('SELECT text FROM ltext')
 
83
            row = cursor.fetchone()
 
84
            self.assertEqual(long_str, row[0].read())
 
85
            cursor.execute('DROP TABLE ltext')
88
86
 
89
 
    @unittest.skipUnless(connection.vendor == 'oracle',
90
 
                         "No need to check Oracle connection semantics")
91
87
    def test_client_encoding(self):
92
88
        # If the backend is Oracle, test that the client encoding is set
93
89
        # correctly.  This was broken under Cygwin prior to r14781.
94
 
        connection.cursor()  # Ensure the connection is initialized.
 
90
        connection.ensure_connection()
95
91
        self.assertEqual(connection.connection.encoding, "UTF-8")
96
92
        self.assertEqual(connection.connection.nencoding, "UTF-8")
97
93
 
98
 
    @unittest.skipUnless(connection.vendor == 'oracle',
99
 
                         "No need to check Oracle connection semantics")
100
94
    def test_order_of_nls_parameters(self):
101
95
        # an 'almost right' datetime should work with configured
102
96
        # NLS parameters as per #18465.
103
 
        c = connection.cursor()
104
 
        query = "select 1 from dual where '1936-12-29 00:00' < sysdate"
105
 
        # Test that the query succeeds without errors - pre #18465 this
106
 
        # wasn't the case.
107
 
        c.execute(query)
108
 
        self.assertEqual(c.fetchone()[0], 1)
109
 
 
110
 
 
 
97
        with connection.cursor() as cursor:
 
98
            query = "select 1 from dual where '1936-12-29 00:00' < sysdate"
 
99
            # Test that the query succeeds without errors - pre #18465 this
 
100
            # wasn't the case.
 
101
            cursor.execute(query)
 
102
            self.assertEqual(cursor.fetchone()[0], 1)
 
103
 
 
104
 
 
105
@unittest.skipUnless(connection.vendor == 'sqlite', "Test only for SQLite")
 
106
class SQLiteTests(TestCase):
 
107
 
 
108
    longMessage = True
 
109
 
 
110
    def test_autoincrement(self):
 
111
        """
 
112
        Check that auto_increment fields are created with the AUTOINCREMENT
 
113
        keyword in order to be monotonically increasing. Refs #10164.
 
114
        """
 
115
        statements = connection.creation.sql_create_model(models.Square,
 
116
            style=no_style())
 
117
        match = re.search('"id" ([^,]+),', statements[0][0])
 
118
        self.assertIsNotNone(match)
 
119
        self.assertEqual('integer NOT NULL PRIMARY KEY AUTOINCREMENT',
 
120
            match.group(1), "Wrong SQL used to create an auto-increment "
 
121
            "column on SQLite")
 
122
 
 
123
    def test_aggregation(self):
 
124
        """
 
125
        #19360: Raise NotImplementedError when aggregating on date/time fields.
 
126
        """
 
127
        for aggregate in (Sum, Avg, Variance, StdDev):
 
128
            self.assertRaises(NotImplementedError,
 
129
                models.Item.objects.all().aggregate, aggregate('time'))
 
130
            self.assertRaises(NotImplementedError,
 
131
                models.Item.objects.all().aggregate, aggregate('date'))
 
132
            self.assertRaises(NotImplementedError,
 
133
                models.Item.objects.all().aggregate, aggregate('last_modified'))
 
134
 
 
135
    def test_convert_values_to_handle_null_value(self):
 
136
        convert_values = DatabaseOperations(connection).convert_values
 
137
        self.assertIsNone(convert_values(None, AutoField(primary_key=True)))
 
138
        self.assertIsNone(convert_values(None, DateField()))
 
139
        self.assertIsNone(convert_values(None, DateTimeField()))
 
140
        self.assertIsNone(convert_values(None, DecimalField()))
 
141
        self.assertIsNone(convert_values(None, IntegerField()))
 
142
        self.assertIsNone(convert_values(None, TimeField()))
 
143
 
 
144
 
 
145
@unittest.skipUnless(connection.vendor == 'postgresql', "Test only for PostgreSQL")
 
146
class PostgreSQLTests(TestCase):
 
147
 
 
148
    def assert_parses(self, version_string, version):
 
149
        self.assertEqual(pg_version._parse_version(version_string), version)
 
150
 
 
151
    def test_parsing(self):
 
152
        """Test PostgreSQL version parsing from `SELECT version()` output"""
 
153
        self.assert_parses("PostgreSQL 8.3 beta4", 80300)
 
154
        self.assert_parses("PostgreSQL 8.3", 80300)
 
155
        self.assert_parses("EnterpriseDB 8.3", 80300)
 
156
        self.assert_parses("PostgreSQL 8.3.6", 80306)
 
157
        self.assert_parses("PostgreSQL 8.4beta1", 80400)
 
158
        self.assert_parses("PostgreSQL 8.3.1 on i386-apple-darwin9.2.2, compiled by GCC i686-apple-darwin9-gcc-4.0.1 (GCC) 4.0.1 (Apple Inc. build 5478)", 80301)
 
159
 
 
160
    def test_version_detection(self):
 
161
        """Test PostgreSQL version detection"""
 
162
 
 
163
        # Helper mocks
 
164
        class CursorMock(object):
 
165
            "Very simple mock of DB-API cursor"
 
166
            def execute(self, arg):
 
167
                pass
 
168
 
 
169
            def fetchone(self):
 
170
                return ["PostgreSQL 8.3"]
 
171
 
 
172
            def __enter__(self):
 
173
                return self
 
174
 
 
175
            def __exit__(self, type, value, traceback):
 
176
                pass
 
177
 
 
178
        class OlderConnectionMock(object):
 
179
            "Mock of psycopg2 (< 2.0.12) connection"
 
180
            def cursor(self):
 
181
                return CursorMock()
 
182
 
 
183
        # psycopg2 < 2.0.12 code path
 
184
        conn = OlderConnectionMock()
 
185
        self.assertEqual(pg_version.get_version(conn), 80300)
 
186
 
 
187
    def test_connect_and_rollback(self):
 
188
        """
 
189
        PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
 
190
        transaction is rolled back (#17062).
 
191
        """
 
192
        databases = copy.deepcopy(settings.DATABASES)
 
193
        new_connections = ConnectionHandler(databases)
 
194
        new_connection = new_connections[DEFAULT_DB_ALIAS]
 
195
        try:
 
196
            # Ensure the database default time zone is different than
 
197
            # the time zone in new_connection.settings_dict. We can
 
198
            # get the default time zone by reset & show.
 
199
            cursor = new_connection.cursor()
 
200
            cursor.execute("RESET TIMEZONE")
 
201
            cursor.execute("SHOW TIMEZONE")
 
202
            db_default_tz = cursor.fetchone()[0]
 
203
            new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
 
204
            new_connection.close()
 
205
 
 
206
            # Fetch a new connection with the new_tz as default
 
207
            # time zone, run a query and rollback.
 
208
            new_connection.settings_dict['TIME_ZONE'] = new_tz
 
209
            new_connection.enter_transaction_management()
 
210
            cursor = new_connection.cursor()
 
211
            new_connection.rollback()
 
212
 
 
213
            # Now let's see if the rollback rolled back the SET TIME ZONE.
 
214
            cursor.execute("SHOW TIMEZONE")
 
215
            tz = cursor.fetchone()[0]
 
216
            self.assertEqual(new_tz, tz)
 
217
        finally:
 
218
            new_connection.close()
 
219
 
 
220
    def test_connect_non_autocommit(self):
 
221
        """
 
222
        The connection wrapper shouldn't believe that autocommit is enabled
 
223
        after setting the time zone when AUTOCOMMIT is False (#21452).
 
224
        """
 
225
        databases = copy.deepcopy(settings.DATABASES)
 
226
        databases[DEFAULT_DB_ALIAS]['AUTOCOMMIT'] = False
 
227
        new_connections = ConnectionHandler(databases)
 
228
        new_connection = new_connections[DEFAULT_DB_ALIAS]
 
229
        try:
 
230
            # Open a database connection.
 
231
            new_connection.cursor()
 
232
            self.assertFalse(new_connection.get_autocommit())
 
233
        finally:
 
234
            new_connection.close()
 
235
 
 
236
    def _select(self, val):
 
237
        with connection.cursor() as cursor:
 
238
            cursor.execute("SELECT %s", (val,))
 
239
            return cursor.fetchone()[0]
 
240
 
 
241
    def test_select_ascii_array(self):
 
242
        a = ["awef"]
 
243
        b = self._select(a)
 
244
        self.assertEqual(a[0], b[0])
 
245
 
 
246
    def test_select_unicode_array(self):
 
247
        a = ["ᄲawef"]
 
248
        b = self._select(a)
 
249
        self.assertEqual(a[0], b[0])
 
250
 
 
251
    def test_lookup_cast(self):
 
252
        from django.db.backends.postgresql_psycopg2.operations import DatabaseOperations
 
253
 
 
254
        do = DatabaseOperations(connection=None)
 
255
        for lookup in ('iexact', 'contains', 'icontains', 'startswith',
 
256
                       'istartswith', 'endswith', 'iendswith', 'regex', 'iregex'):
 
257
            self.assertIn('::text', do.lookup_cast(lookup))
 
258
 
 
259
 
 
260
@unittest.skipUnless(connection.vendor == 'mysql', "Test only for MySQL")
111
261
class MySQLTests(TestCase):
112
 
    @unittest.skipUnless(connection.vendor == 'mysql',
113
 
                        "Test valid only for MySQL")
 
262
 
114
263
    def test_autoincrement(self):
115
264
        """
116
265
        Check that auto_increment fields are reset correctly by sql_flush().
186
335
        """
187
336
        Test that last_executed_query() returns an Unicode string
188
337
        """
189
 
        persons = models.Reporter.objects.filter(raw_data=b'\x00\x46  \xFE').extra(select={'föö': 1})
190
 
        sql, params = persons.query.sql_with_params()
191
 
        cursor = persons.query.get_compiler('default').execute_sql(None)
 
338
        data = models.RawData.objects.filter(raw_data=b'\x00\x46  \xFE').extra(select={'föö': 1})
 
339
        sql, params = data.query.sql_with_params()
 
340
        cursor = data.query.get_compiler('default').execute_sql(CURSOR)
192
341
        last_sql = cursor.db.ops.last_executed_query(cursor, sql, params)
193
342
        self.assertIsInstance(last_sql, six.text_type)
194
343
 
204
353
 
205
354
 
206
355
class ParameterHandlingTest(TestCase):
 
356
 
207
357
    def test_bad_parameter_count(self):
208
358
        "An executemany call with too many/not enough parameters will raise an exception (Refs #12612)"
209
359
        cursor = connection.cursor()
264
414
 
265
415
 
266
416
class SequenceResetTest(TestCase):
 
417
 
267
418
    def test_generic_relation(self):
268
419
        "Sequence names are correct when resetting generic relations (Ref #13941)"
269
420
        # Create an object with a manually specified PK
281
432
        self.assertTrue(obj.pk > 10)
282
433
 
283
434
 
284
 
class PostgresVersionTest(TestCase):
285
 
    def assert_parses(self, version_string, version):
286
 
        self.assertEqual(pg_version._parse_version(version_string), version)
287
 
 
288
 
    def test_parsing(self):
289
 
        """Test PostgreSQL version parsing from `SELECT version()` output"""
290
 
        self.assert_parses("PostgreSQL 8.3 beta4", 80300)
291
 
        self.assert_parses("PostgreSQL 8.3", 80300)
292
 
        self.assert_parses("EnterpriseDB 8.3", 80300)
293
 
        self.assert_parses("PostgreSQL 8.3.6", 80306)
294
 
        self.assert_parses("PostgreSQL 8.4beta1", 80400)
295
 
        self.assert_parses("PostgreSQL 8.3.1 on i386-apple-darwin9.2.2, compiled by GCC i686-apple-darwin9-gcc-4.0.1 (GCC) 4.0.1 (Apple Inc. build 5478)", 80301)
296
 
 
297
 
    def test_version_detection(self):
298
 
        """Test PostgreSQL version detection"""
299
 
 
300
 
        # Helper mocks
301
 
        class CursorMock(object):
302
 
            "Very simple mock of DB-API cursor"
303
 
            def execute(self, arg):
304
 
                pass
305
 
 
306
 
            def fetchone(self):
307
 
                return ["PostgreSQL 8.3"]
308
 
 
309
 
        class OlderConnectionMock(object):
310
 
            "Mock of psycopg2 (< 2.0.12) connection"
311
 
            def cursor(self):
312
 
                return CursorMock()
313
 
 
314
 
        # psycopg2 < 2.0.12 code path
315
 
        conn = OlderConnectionMock()
316
 
        self.assertEqual(pg_version.get_version(conn), 80300)
317
 
 
318
 
 
319
 
class PostgresNewConnectionTests(TestCase):
320
 
 
321
 
    @unittest.skipUnless(
322
 
        connection.vendor == 'postgresql',
323
 
        "This test applies only to PostgreSQL")
324
 
    def test_connect_and_rollback(self):
325
 
        """
326
 
        PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
327
 
        transaction is rolled back (#17062).
328
 
        """
329
 
        databases = copy.deepcopy(settings.DATABASES)
330
 
        new_connections = ConnectionHandler(databases)
331
 
        new_connection = new_connections[DEFAULT_DB_ALIAS]
332
 
        try:
333
 
            # Ensure the database default time zone is different than
334
 
            # the time zone in new_connection.settings_dict. We can
335
 
            # get the default time zone by reset & show.
336
 
            cursor = new_connection.cursor()
337
 
            cursor.execute("RESET TIMEZONE")
338
 
            cursor.execute("SHOW TIMEZONE")
339
 
            db_default_tz = cursor.fetchone()[0]
340
 
            new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
341
 
            new_connection.close()
342
 
 
343
 
            # Fetch a new connection with the new_tz as default
344
 
            # time zone, run a query and rollback.
345
 
            new_connection.settings_dict['TIME_ZONE'] = new_tz
346
 
            new_connection.enter_transaction_management()
347
 
            cursor = new_connection.cursor()
348
 
            new_connection.rollback()
349
 
 
350
 
            # Now let's see if the rollback rolled back the SET TIME ZONE.
351
 
            cursor.execute("SHOW TIMEZONE")
352
 
            tz = cursor.fetchone()[0]
353
 
            self.assertEqual(new_tz, tz)
354
 
        finally:
355
 
            new_connection.close()
356
 
 
357
 
    @unittest.skipUnless(
358
 
        connection.vendor == 'postgresql',
359
 
        "This test applies only to PostgreSQL")
360
 
    def test_connect_non_autocommit(self):
361
 
        """
362
 
        The connection wrapper shouldn't believe that autocommit is enabled
363
 
        after setting the time zone when AUTOCOMMIT is False (#21452).
364
 
        """
365
 
        databases = copy.deepcopy(settings.DATABASES)
366
 
        databases[DEFAULT_DB_ALIAS]['AUTOCOMMIT'] = False
367
 
        new_connections = ConnectionHandler(databases)
368
 
        new_connection = new_connections[DEFAULT_DB_ALIAS]
369
 
        try:
370
 
            # Open a database connection.
371
 
            new_connection.cursor()
372
 
            self.assertFalse(new_connection.get_autocommit())
373
 
        finally:
374
 
            new_connection.close()
375
 
 
376
 
 
377
435
# This test needs to run outside of a transaction, otherwise closing the
378
436
# connection would implicitly rollback and cause problems during teardown.
379
437
class ConnectionCreatedSignalTest(TransactionTestCase):
406
464
    EscapingChecksDebug test case, to also test CursorDebugWrapper.
407
465
    """
408
466
 
409
 
    # For Oracle, when you want to select a value, you need to specify the
410
 
    # special pseudo-table 'dual'; a select with no from clause is invalid.
411
 
    bare_select_suffix = " FROM DUAL" if connection.vendor == 'oracle' else ""
 
467
    bare_select_suffix = connection.features.bare_select_suffix
412
468
 
413
469
    def test_paramless_no_escaping(self):
414
470
        cursor = connection.cursor()
430
486
        # response should be an non-zero integer
431
487
        self.assertTrue(int(response))
432
488
 
 
489
 
433
490
@override_settings(DEBUG=True)
434
491
class EscapingChecksDebug(EscapingChecks):
435
492
    pass
436
493
 
437
494
 
438
 
class SqliteAggregationTests(TestCase):
439
 
    """
440
 
    #19360: Raise NotImplementedError when aggregating on date/time fields.
441
 
    """
442
 
    @unittest.skipUnless(connection.vendor == 'sqlite',
443
 
                         "No need to check SQLite aggregation semantics")
444
 
    def test_aggregation(self):
445
 
        for aggregate in (Sum, Avg, Variance, StdDev):
446
 
            self.assertRaises(NotImplementedError,
447
 
                models.Item.objects.all().aggregate, aggregate('time'))
448
 
            self.assertRaises(NotImplementedError,
449
 
                models.Item.objects.all().aggregate, aggregate('date'))
450
 
            self.assertRaises(NotImplementedError,
451
 
                models.Item.objects.all().aggregate, aggregate('last_modified'))
452
 
 
453
 
 
454
 
class SqliteChecks(TestCase):
455
 
 
456
 
    @unittest.skipUnless(connection.vendor == 'sqlite',
457
 
                         "No need to do SQLite checks")
458
 
    def test_convert_values_to_handle_null_value(self):
459
 
        database_operations = DatabaseOperations(connection)
460
 
        self.assertEqual(
461
 
            None,
462
 
            database_operations.convert_values(None, AutoField(primary_key=True))
463
 
        )
464
 
        self.assertEqual(
465
 
            None,
466
 
            database_operations.convert_values(None, DateField())
467
 
        )
468
 
        self.assertEqual(
469
 
            None,
470
 
            database_operations.convert_values(None, DateTimeField())
471
 
        )
472
 
        self.assertEqual(
473
 
            None,
474
 
            database_operations.convert_values(None, DecimalField())
475
 
        )
476
 
        self.assertEqual(
477
 
            None,
478
 
            database_operations.convert_values(None, IntegerField())
479
 
        )
480
 
        self.assertEqual(
481
 
            None,
482
 
            database_operations.convert_values(None, TimeField())
483
 
        )
484
 
 
485
 
 
486
495
class BackendTestCase(TestCase):
487
496
 
488
497
    def create_squares_with_executemany(self, args):
494
503
        tbl = connection.introspection.table_name_converter(opts.db_table)
495
504
        f1 = connection.ops.quote_name(opts.get_field('root').column)
496
505
        f2 = connection.ops.quote_name(opts.get_field('square').column)
497
 
        if paramstyle=='format':
 
506
        if paramstyle == 'format':
498
507
            query = 'INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (tbl, f1, f2)
499
 
        elif paramstyle=='pyformat':
 
508
        elif paramstyle == 'pyformat':
500
509
            query = 'INSERT INTO %s (%s, %s) VALUES (%%(root)s, %%(square)s)' % (tbl, f1, f2)
501
510
        else:
502
511
            raise ValueError("unsupported paramstyle in test")
507
516
 
508
517
    def test_cursor_executemany(self):
509
518
        #4896: Test cursor.executemany
510
 
        args = [(i, i**2) for i in range(-5, 6)]
 
519
        args = [(i, i ** 2) for i in range(-5, 6)]
511
520
        self.create_squares_with_executemany(args)
512
521
        self.assertEqual(models.Square.objects.count(), 11)
513
522
        for i in range(-5, 6):
514
523
            square = models.Square.objects.get(root=i)
515
 
            self.assertEqual(square.square, i**2)
 
524
            self.assertEqual(square.square, i ** 2)
516
525
 
517
526
    def test_cursor_executemany_with_empty_params_list(self):
518
527
        #4765: executemany with params=[] does nothing
522
531
 
523
532
    def test_cursor_executemany_with_iterator(self):
524
533
        #10320: executemany accepts iterators
525
 
        args = iter((i, i**2) for i in range(-3, 2))
 
534
        args = iter((i, i ** 2) for i in range(-3, 2))
526
535
        self.create_squares_with_executemany(args)
527
536
        self.assertEqual(models.Square.objects.count(), 5)
528
537
 
529
 
        args = iter((i, i**2) for i in range(3, 7))
 
538
        args = iter((i, i ** 2) for i in range(3, 7))
530
539
        with override_settings(DEBUG=True):
531
540
            # same test for DebugCursorWrapper
532
541
            self.create_squares_with_executemany(args)
534
543
 
535
544
    @skipUnlessDBFeature('supports_paramstyle_pyformat')
536
545
    def test_cursor_execute_with_pyformat(self):
537
 
        #10070: Support pyformat style passing of paramters
 
546
        #10070: Support pyformat style passing of parameters
538
547
        args = {'root': 3, 'square': 9}
539
548
        self.create_squares(args, 'pyformat', multiple=False)
540
549
        self.assertEqual(models.Square.objects.count(), 1)
541
550
 
542
551
    @skipUnlessDBFeature('supports_paramstyle_pyformat')
543
552
    def test_cursor_executemany_with_pyformat(self):
544
 
        #10070: Support pyformat style passing of paramters
545
 
        args = [{'root': i, 'square': i**2} for i in range(-5, 6)]
 
553
        #10070: Support pyformat style passing of parameters
 
554
        args = [{'root': i, 'square': i ** 2} for i in range(-5, 6)]
546
555
        self.create_squares(args, 'pyformat', multiple=True)
547
556
        self.assertEqual(models.Square.objects.count(), 11)
548
557
        for i in range(-5, 6):
549
558
            square = models.Square.objects.get(root=i)
550
 
            self.assertEqual(square.square, i**2)
 
559
            self.assertEqual(square.square, i ** 2)
551
560
 
552
561
    @skipUnlessDBFeature('supports_paramstyle_pyformat')
553
562
    def test_cursor_executemany_with_pyformat_iterator(self):
554
 
        args = iter({'root': i, 'square': i**2} for i in range(-3, 2))
 
563
        args = iter({'root': i, 'square': i ** 2} for i in range(-3, 2))
555
564
        self.create_squares(args, 'pyformat', multiple=True)
556
565
        self.assertEqual(models.Square.objects.count(), 5)
557
566
 
558
 
        args = iter({'root': i, 'square': i**2} for i in range(3, 7))
 
567
        args = iter({'root': i, 'square': i ** 2} for i in range(3, 7))
559
568
        with override_settings(DEBUG=True):
560
569
            # same test for DebugCursorWrapper
561
570
            self.create_squares(args, 'pyformat', multiple=True)
611
620
        with self.assertRaises(DatabaseError):
612
621
            cursor.execute(query)
613
622
 
 
623
    def test_cursor_contextmanager(self):
 
624
        """
 
625
        Test that cursors can be used as a context manager
 
626
        """
 
627
        with connection.cursor() as cursor:
 
628
            self.assertTrue(isinstance(cursor, CursorWrapper))
 
629
        # Both InterfaceError and ProgrammingError seem to be used when
 
630
        # accessing closed cursor (psycopg2 has InterfaceError, rest seem
 
631
        # to use ProgrammingError).
 
632
        with self.assertRaises(connection.features.closed_cursor_error_class):
 
633
            # cursor should be closed, so no queries should be possible.
 
634
            cursor.execute("select 1")
 
635
 
 
636
    @unittest.skipUnless(connection.vendor == 'postgresql',
 
637
                         "Psycopg2 specific cursor.closed attribute needed")
 
638
    def test_cursor_contextmanager_closing(self):
 
639
        # There isn't a generic way to test that cursors are closed, but
 
640
        # psycopg2 offers us a way to check that by closed attribute.
 
641
        # So, run only on psycopg2 for that reason.
 
642
        with connection.cursor() as cursor:
 
643
            self.assertTrue(isinstance(cursor, CursorWrapper))
 
644
        self.assertTrue(cursor.closed)
 
645
 
614
646
    # Unfortunately with sqlite3 the in-memory test database cannot be closed.
615
647
    @skipUnlessDBFeature('test_db_allows_multiple_connections')
616
648
    def test_is_usable_after_database_disconnects(self):
620
652
        Regression for #21553.
621
653
        """
622
654
        # Open a connection to the database.
623
 
        connection.cursor().close()
 
655
        with connection.cursor():
 
656
            pass
624
657
        # Emulate a connection close by the database.
625
658
        connection._close()
626
659
        # Even then is_usable() should not raise an exception.
704
737
        with transaction.atomic():
705
738
            # Create an Article.
706
739
            models.Article.objects.create(headline="Test article", pub_date=datetime.datetime(2010, 9, 4), reporter=self.r)
707
 
            # Retrive it from the DB
 
740
            # Retrieve it from the DB
708
741
            a = models.Article.objects.get(headline="Test article")
709
742
            a.reporter_id = 30
710
743
            try:
722
755
        with transaction.atomic():
723
756
            # Create an Article.
724
757
            models.Article.objects.create(headline="Test article", pub_date=datetime.datetime(2010, 9, 4), reporter=self.r)
725
 
            # Retrive it from the DB
 
758
            # Retrieve it from the DB
726
759
            a = models.Article.objects.get(headline="Test article")
727
760
            a.reporter_id = 30
728
761
            try:
739
772
        with transaction.atomic():
740
773
            # Create an Article.
741
774
            models.Article.objects.create(headline="Test article", pub_date=datetime.datetime(2010, 9, 4), reporter=self.r)
742
 
            # Retrive it from the DB
 
775
            # Retrieve it from the DB
743
776
            a = models.Article.objects.get(headline="Test article")
744
777
            a.reporter_id = 30
745
778
            with connection.constraint_checks_disabled():
905
938
class MySQLPKZeroTests(TestCase):
906
939
    """
907
940
    Zero as id for AutoField should raise exception in MySQL, because MySQL
908
 
    does not allow zero for automatic primary key.
 
941
    does not allow zero for autoincrement primary key.
909
942
    """
910
 
 
911
 
    @skipIfDBFeature('allows_primary_key_0')
 
943
    @skipIfDBFeature('allows_auto_pk_0')
912
944
    def test_zero_as_autoval(self):
913
945
        with self.assertRaises(ValueError):
914
946
            models.Square.objects.create(id=0, root=0, square=1)
984
1016
              '0.1')
985
1017
        equal('0.1234567890', 12, 0,
986
1018
              '0')
 
1019
 
 
1020
 
 
1021
class DBTestSettingsRenamedTests(IgnoreAllDeprecationWarningsMixin, TestCase):
 
1022
 
 
1023
    mismatch_msg = ("Connection 'test-deprecation' has mismatched TEST "
 
1024
                    "and TEST_* database settings.")
 
1025
 
 
1026
    @classmethod
 
1027
    def setUpClass(cls):
 
1028
        # Silence "UserWarning: Overriding setting DATABASES can lead to
 
1029
        # unexpected behavior."
 
1030
        cls.warning_classes.append(UserWarning)
 
1031
 
 
1032
    def setUp(self):
 
1033
        super(DBTestSettingsRenamedTests, self).setUp()
 
1034
        self.handler = ConnectionHandler()
 
1035
        self.db_settings = {'default': {}}
 
1036
 
 
1037
    def test_mismatched_database_test_settings_1(self):
 
1038
        # if the TEST setting is used, all TEST_* keys must appear in it.
 
1039
        self.db_settings.update({
 
1040
            'test-deprecation': {
 
1041
                'TEST': {},
 
1042
                'TEST_NAME': 'foo',
 
1043
            }
 
1044
        })
 
1045
        with override_settings(DATABASES=self.db_settings):
 
1046
            with self.assertRaisesMessage(ImproperlyConfigured, self.mismatch_msg):
 
1047
                self.handler.prepare_test_settings('test-deprecation')
 
1048
 
 
1049
    def test_mismatched_database_test_settings_2(self):
 
1050
        # if the TEST setting is used, all TEST_* keys must match.
 
1051
        self.db_settings.update({
 
1052
            'test-deprecation': {
 
1053
                'TEST': {'NAME': 'foo'},
 
1054
                'TEST_NAME': 'bar',
 
1055
            },
 
1056
        })
 
1057
        with override_settings(DATABASES=self.db_settings):
 
1058
            with self.assertRaisesMessage(ImproperlyConfigured, self.mismatch_msg):
 
1059
                self.handler.prepare_test_settings('test-deprecation')
 
1060
 
 
1061
    def test_mismatched_database_test_settings_3(self):
 
1062
        # Verifies the mapping of an aliased key.
 
1063
        self.db_settings.update({
 
1064
            'test-deprecation': {
 
1065
                'TEST': {'CREATE_DB': 'foo'},
 
1066
                'TEST_CREATE': 'bar',
 
1067
            },
 
1068
        })
 
1069
        with override_settings(DATABASES=self.db_settings):
 
1070
            with self.assertRaisesMessage(ImproperlyConfigured, self.mismatch_msg):
 
1071
                self.handler.prepare_test_settings('test-deprecation')
 
1072
 
 
1073
    def test_mismatched_database_test_settings_4(self):
 
1074
        # Verifies the mapping of an aliased key when the aliased key is missing.
 
1075
        self.db_settings.update({
 
1076
            'test-deprecation': {
 
1077
                'TEST': {},
 
1078
                'TEST_CREATE': 'bar',
 
1079
            },
 
1080
        })
 
1081
        with override_settings(DATABASES=self.db_settings):
 
1082
            with self.assertRaisesMessage(ImproperlyConfigured, self.mismatch_msg):
 
1083
                self.handler.prepare_test_settings('test-deprecation')
 
1084
 
 
1085
    def test_mismatched_settings_old_none(self):
 
1086
        self.db_settings.update({
 
1087
            'test-deprecation': {
 
1088
                'TEST': {'CREATE_DB': None},
 
1089
                'TEST_CREATE': '',
 
1090
            },
 
1091
        })
 
1092
        with override_settings(DATABASES=self.db_settings):
 
1093
            with self.assertRaisesMessage(ImproperlyConfigured, self.mismatch_msg):
 
1094
                self.handler.prepare_test_settings('test-deprecation')
 
1095
 
 
1096
    def test_mismatched_settings_new_none(self):
 
1097
        self.db_settings.update({
 
1098
            'test-deprecation': {
 
1099
                'TEST': {},
 
1100
                'TEST_CREATE': None,
 
1101
            },
 
1102
        })
 
1103
        with override_settings(DATABASES=self.db_settings):
 
1104
            with self.assertRaisesMessage(ImproperlyConfigured, self.mismatch_msg):
 
1105
                self.handler.prepare_test_settings('test-deprecation')
 
1106
 
 
1107
    def test_matched_test_settings(self):
 
1108
        # should be able to define new settings and the old, if they match
 
1109
        self.db_settings.update({
 
1110
            'test-deprecation': {
 
1111
                'TEST': {'NAME': 'foo'},
 
1112
                'TEST_NAME': 'foo',
 
1113
            },
 
1114
        })
 
1115
        with override_settings(DATABASES=self.db_settings):
 
1116
            self.handler.prepare_test_settings('test-deprecation')
 
1117
 
 
1118
    def test_new_settings_only(self):
 
1119
        # should be able to define new settings without the old
 
1120
        self.db_settings.update({
 
1121
            'test-deprecation': {
 
1122
                'TEST': {'NAME': 'foo'},
 
1123
            },
 
1124
        })
 
1125
        with override_settings(DATABASES=self.db_settings):
 
1126
            self.handler.prepare_test_settings('test-deprecation')
 
1127
 
 
1128
    def test_old_settings_only(self):
 
1129
        # should be able to define old settings without the new
 
1130
        self.db_settings.update({
 
1131
            'test-deprecation': {
 
1132
                'TEST_NAME': 'foo',
 
1133
            },
 
1134
        })
 
1135
        with override_settings(DATABASES=self.db_settings):
 
1136
            self.handler.prepare_test_settings('test-deprecation')
 
1137
 
 
1138
    def test_empty_settings(self):
 
1139
        with override_settings(DATABASES=self.db_settings):
 
1140
            self.handler.prepare_test_settings('default')