~ubuntu-branches/debian/sid/python-django/sid

« back to all changes in this revision

Viewing changes to tests/requests/tests.py

  • Committer: Package Import Robot
  • Author(s): Luke Faraone
  • Date: 2013-11-07 15:33:49 UTC
  • mfrom: (1.3.12)
  • Revision ID: package-import@ubuntu.com-20131107153349-e31sc149l2szs3jb
Tags: 1.6-1
* New upstream version. Closes: #557474, #724637.
* python-django now also suggests the installation of ipython,
  bpython, python-django-doc, and libgdal1.
  Closes: #636511, #686333, #704203
* Set package maintainer to Debian Python Modules Team.
* Bump standards version to 3.9.5, no changes needed.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- encoding: utf-8 -*-
 
2
from __future__ import unicode_literals
 
3
 
 
4
import time
 
5
import warnings
 
6
from datetime import datetime, timedelta
 
7
from io import BytesIO
 
8
 
 
9
from django.db import connection, connections, DEFAULT_DB_ALIAS
 
10
from django.core import signals
 
11
from django.core.exceptions import SuspiciousOperation
 
12
from django.core.handlers.wsgi import WSGIRequest, LimitedStream
 
13
from django.http import HttpRequest, HttpResponse, parse_cookie, build_request_repr, UnreadablePostError
 
14
from django.test import SimpleTestCase, TransactionTestCase
 
15
from django.test.client import FakePayload
 
16
from django.test.utils import override_settings, str_prefix
 
17
from django.utils import six
 
18
from django.utils.unittest import skipIf
 
19
from django.utils.http import cookie_date, urlencode
 
20
from django.utils.six.moves.urllib.parse import urlencode as original_urlencode
 
21
from django.utils.timezone import utc
 
22
 
 
23
 
 
24
class RequestsTests(SimpleTestCase):
 
25
    def test_httprequest(self):
 
26
        request = HttpRequest()
 
27
        self.assertEqual(list(request.GET.keys()), [])
 
28
        self.assertEqual(list(request.POST.keys()), [])
 
29
        self.assertEqual(list(request.COOKIES.keys()), [])
 
30
        self.assertEqual(list(request.META.keys()), [])
 
31
 
 
32
    def test_httprequest_repr(self):
 
33
        request = HttpRequest()
 
34
        request.path = '/somepath/'
 
35
        request.GET = {'get-key': 'get-value'}
 
36
        request.POST = {'post-key': 'post-value'}
 
37
        request.COOKIES = {'post-key': 'post-value'}
 
38
        request.META = {'post-key': 'post-value'}
 
39
        self.assertEqual(repr(request), str_prefix("<HttpRequest\npath:/somepath/,\nGET:{%(_)s'get-key': %(_)s'get-value'},\nPOST:{%(_)s'post-key': %(_)s'post-value'},\nCOOKIES:{%(_)s'post-key': %(_)s'post-value'},\nMETA:{%(_)s'post-key': %(_)s'post-value'}>"))
 
40
        self.assertEqual(build_request_repr(request), repr(request))
 
41
        self.assertEqual(build_request_repr(request, path_override='/otherpath/', GET_override={'a': 'b'}, POST_override={'c': 'd'}, COOKIES_override={'e': 'f'}, META_override={'g': 'h'}),
 
42
                         str_prefix("<HttpRequest\npath:/otherpath/,\nGET:{%(_)s'a': %(_)s'b'},\nPOST:{%(_)s'c': %(_)s'd'},\nCOOKIES:{%(_)s'e': %(_)s'f'},\nMETA:{%(_)s'g': %(_)s'h'}>"))
 
43
 
 
44
    def test_wsgirequest(self):
 
45
        request = WSGIRequest({'PATH_INFO': 'bogus', 'REQUEST_METHOD': 'bogus', 'wsgi.input': BytesIO(b'')})
 
46
        self.assertEqual(list(request.GET.keys()), [])
 
47
        self.assertEqual(list(request.POST.keys()), [])
 
48
        self.assertEqual(list(request.COOKIES.keys()), [])
 
49
        self.assertEqual(set(request.META.keys()), set(['PATH_INFO', 'REQUEST_METHOD', 'SCRIPT_NAME', 'wsgi.input']))
 
50
        self.assertEqual(request.META['PATH_INFO'], 'bogus')
 
51
        self.assertEqual(request.META['REQUEST_METHOD'], 'bogus')
 
52
        self.assertEqual(request.META['SCRIPT_NAME'], '')
 
53
 
 
54
    def test_wsgirequest_with_script_name(self):
 
55
        """
 
56
        Ensure that the request's path is correctly assembled, regardless of
 
57
        whether or not the SCRIPT_NAME has a trailing slash.
 
58
        Refs #20169.
 
59
        """
 
60
        # With trailing slash
 
61
        request = WSGIRequest({'PATH_INFO': '/somepath/', 'SCRIPT_NAME': '/PREFIX/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
 
62
        self.assertEqual(request.path, '/PREFIX/somepath/')
 
63
        # Without trailing slash
 
64
        request = WSGIRequest({'PATH_INFO': '/somepath/', 'SCRIPT_NAME': '/PREFIX', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
 
65
        self.assertEqual(request.path, '/PREFIX/somepath/')
 
66
 
 
67
    def test_wsgirequest_with_force_script_name(self):
 
68
        """
 
69
        Ensure that the FORCE_SCRIPT_NAME setting takes precedence over the
 
70
        request's SCRIPT_NAME environment parameter.
 
71
        Refs #20169.
 
72
        """
 
73
        with override_settings(FORCE_SCRIPT_NAME='/FORCED_PREFIX/'):
 
74
            request = WSGIRequest({'PATH_INFO': '/somepath/', 'SCRIPT_NAME': '/PREFIX/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
 
75
            self.assertEqual(request.path, '/FORCED_PREFIX/somepath/')
 
76
 
 
77
    def test_wsgirequest_path_with_force_script_name_trailing_slash(self):
 
78
        """
 
79
        Ensure that the request's path is correctly assembled, regardless of
 
80
        whether or not the FORCE_SCRIPT_NAME setting has a trailing slash.
 
81
        Refs #20169.
 
82
        """
 
83
        # With trailing slash
 
84
        with override_settings(FORCE_SCRIPT_NAME='/FORCED_PREFIX/'):
 
85
            request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
 
86
            self.assertEqual(request.path, '/FORCED_PREFIX/somepath/')
 
87
        # Without trailing slash
 
88
        with override_settings(FORCE_SCRIPT_NAME='/FORCED_PREFIX'):
 
89
            request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
 
90
            self.assertEqual(request.path, '/FORCED_PREFIX/somepath/')
 
91
 
 
92
    def test_wsgirequest_repr(self):
 
93
        request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
 
94
        request.GET = {'get-key': 'get-value'}
 
95
        request.POST = {'post-key': 'post-value'}
 
96
        request.COOKIES = {'post-key': 'post-value'}
 
97
        request.META = {'post-key': 'post-value'}
 
98
        self.assertEqual(repr(request), str_prefix("<WSGIRequest\npath:/somepath/,\nGET:{%(_)s'get-key': %(_)s'get-value'},\nPOST:{%(_)s'post-key': %(_)s'post-value'},\nCOOKIES:{%(_)s'post-key': %(_)s'post-value'},\nMETA:{%(_)s'post-key': %(_)s'post-value'}>"))
 
99
        self.assertEqual(build_request_repr(request), repr(request))
 
100
        self.assertEqual(build_request_repr(request, path_override='/otherpath/', GET_override={'a': 'b'}, POST_override={'c': 'd'}, COOKIES_override={'e': 'f'}, META_override={'g': 'h'}),
 
101
                         str_prefix("<WSGIRequest\npath:/otherpath/,\nGET:{%(_)s'a': %(_)s'b'},\nPOST:{%(_)s'c': %(_)s'd'},\nCOOKIES:{%(_)s'e': %(_)s'f'},\nMETA:{%(_)s'g': %(_)s'h'}>"))
 
102
 
 
103
    def test_wsgirequest_path_info(self):
 
104
        def wsgi_str(path_info):
 
105
            path_info = path_info.encode('utf-8')           # Actual URL sent by the browser (bytestring)
 
106
            if six.PY3:
 
107
                path_info = path_info.decode('iso-8859-1')  # Value in the WSGI environ dict (native string)
 
108
            return path_info
 
109
        # Regression for #19468
 
110
        request = WSGIRequest({'PATH_INFO': wsgi_str("/سلام/"), 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
 
111
        self.assertEqual(request.path, "/سلام/")
 
112
 
 
113
    def test_parse_cookie(self):
 
114
        self.assertEqual(parse_cookie('invalid@key=true'), {})
 
115
 
 
116
    def test_httprequest_location(self):
 
117
        request = HttpRequest()
 
118
        self.assertEqual(request.build_absolute_uri(location="https://www.example.com/asdf"),
 
119
            'https://www.example.com/asdf')
 
120
 
 
121
        request.get_host = lambda: 'www.example.com'
 
122
        request.path = ''
 
123
        self.assertEqual(request.build_absolute_uri(location="/path/with:colons"),
 
124
            'http://www.example.com/path/with:colons')
 
125
 
 
126
    @override_settings(
 
127
        USE_X_FORWARDED_HOST=False,
 
128
        ALLOWED_HOSTS=[
 
129
            'forward.com', 'example.com', 'internal.com', '12.34.56.78',
 
130
            '[2001:19f0:feee::dead:beef:cafe]', 'xn--4ca9at.com',
 
131
            '.multitenant.com', 'INSENSITIVE.com',
 
132
            ])
 
133
    def test_http_get_host(self):
 
134
        # Check if X_FORWARDED_HOST is provided.
 
135
        request = HttpRequest()
 
136
        request.META = {
 
137
            'HTTP_X_FORWARDED_HOST': 'forward.com',
 
138
            'HTTP_HOST': 'example.com',
 
139
            'SERVER_NAME': 'internal.com',
 
140
            'SERVER_PORT': 80,
 
141
        }
 
142
        # X_FORWARDED_HOST is ignored.
 
143
        self.assertEqual(request.get_host(), 'example.com')
 
144
 
 
145
        # Check if X_FORWARDED_HOST isn't provided.
 
146
        request = HttpRequest()
 
147
        request.META = {
 
148
            'HTTP_HOST': 'example.com',
 
149
            'SERVER_NAME': 'internal.com',
 
150
            'SERVER_PORT': 80,
 
151
        }
 
152
        self.assertEqual(request.get_host(), 'example.com')
 
153
 
 
154
        # Check if HTTP_HOST isn't provided.
 
155
        request = HttpRequest()
 
156
        request.META = {
 
157
            'SERVER_NAME': 'internal.com',
 
158
            'SERVER_PORT': 80,
 
159
        }
 
160
        self.assertEqual(request.get_host(), 'internal.com')
 
161
 
 
162
        # Check if HTTP_HOST isn't provided, and we're on a nonstandard port
 
163
        request = HttpRequest()
 
164
        request.META = {
 
165
            'SERVER_NAME': 'internal.com',
 
166
            'SERVER_PORT': 8042,
 
167
        }
 
168
        self.assertEqual(request.get_host(), 'internal.com:8042')
 
169
 
 
170
        # Poisoned host headers are rejected as suspicious
 
171
        legit_hosts = [
 
172
            'example.com',
 
173
            'example.com:80',
 
174
            '12.34.56.78',
 
175
            '12.34.56.78:443',
 
176
            '[2001:19f0:feee::dead:beef:cafe]',
 
177
            '[2001:19f0:feee::dead:beef:cafe]:8080',
 
178
            'xn--4ca9at.com', # Punnycode for öäü.com
 
179
            'anything.multitenant.com',
 
180
            'multitenant.com',
 
181
            'insensitive.com',
 
182
        ]
 
183
 
 
184
        poisoned_hosts = [
 
185
            'example.com@evil.tld',
 
186
            'example.com:dr.frankenstein@evil.tld',
 
187
            'example.com:dr.frankenstein@evil.tld:80',
 
188
            'example.com:80/badpath',
 
189
            'example.com: recovermypassword.com',
 
190
            'other.com', # not in ALLOWED_HOSTS
 
191
        ]
 
192
 
 
193
        for host in legit_hosts:
 
194
            request = HttpRequest()
 
195
            request.META = {
 
196
                'HTTP_HOST': host,
 
197
            }
 
198
            request.get_host()
 
199
 
 
200
        for host in poisoned_hosts:
 
201
            with self.assertRaises(SuspiciousOperation):
 
202
                request = HttpRequest()
 
203
                request.META = {
 
204
                    'HTTP_HOST': host,
 
205
                }
 
206
                request.get_host()
 
207
 
 
208
    @override_settings(USE_X_FORWARDED_HOST=True, ALLOWED_HOSTS=['*'])
 
209
    def test_http_get_host_with_x_forwarded_host(self):
 
210
        # Check if X_FORWARDED_HOST is provided.
 
211
        request = HttpRequest()
 
212
        request.META = {
 
213
            'HTTP_X_FORWARDED_HOST': 'forward.com',
 
214
            'HTTP_HOST': 'example.com',
 
215
            'SERVER_NAME': 'internal.com',
 
216
            'SERVER_PORT': 80,
 
217
        }
 
218
        # X_FORWARDED_HOST is obeyed.
 
219
        self.assertEqual(request.get_host(), 'forward.com')
 
220
 
 
221
        # Check if X_FORWARDED_HOST isn't provided.
 
222
        request = HttpRequest()
 
223
        request.META = {
 
224
            'HTTP_HOST': 'example.com',
 
225
            'SERVER_NAME': 'internal.com',
 
226
            'SERVER_PORT': 80,
 
227
        }
 
228
        self.assertEqual(request.get_host(), 'example.com')
 
229
 
 
230
        # Check if HTTP_HOST isn't provided.
 
231
        request = HttpRequest()
 
232
        request.META = {
 
233
            'SERVER_NAME': 'internal.com',
 
234
            'SERVER_PORT': 80,
 
235
        }
 
236
        self.assertEqual(request.get_host(), 'internal.com')
 
237
 
 
238
        # Check if HTTP_HOST isn't provided, and we're on a nonstandard port
 
239
        request = HttpRequest()
 
240
        request.META = {
 
241
            'SERVER_NAME': 'internal.com',
 
242
            'SERVER_PORT': 8042,
 
243
        }
 
244
        self.assertEqual(request.get_host(), 'internal.com:8042')
 
245
 
 
246
        # Poisoned host headers are rejected as suspicious
 
247
        legit_hosts = [
 
248
            'example.com',
 
249
            'example.com:80',
 
250
            '12.34.56.78',
 
251
            '12.34.56.78:443',
 
252
            '[2001:19f0:feee::dead:beef:cafe]',
 
253
            '[2001:19f0:feee::dead:beef:cafe]:8080',
 
254
            'xn--4ca9at.com', # Punnycode for öäü.com
 
255
        ]
 
256
 
 
257
        poisoned_hosts = [
 
258
            'example.com@evil.tld',
 
259
            'example.com:dr.frankenstein@evil.tld',
 
260
            'example.com:dr.frankenstein@evil.tld:80',
 
261
            'example.com:80/badpath',
 
262
            'example.com: recovermypassword.com',
 
263
        ]
 
264
 
 
265
        for host in legit_hosts:
 
266
            request = HttpRequest()
 
267
            request.META = {
 
268
                'HTTP_HOST': host,
 
269
            }
 
270
            request.get_host()
 
271
 
 
272
        for host in poisoned_hosts:
 
273
            with self.assertRaises(SuspiciousOperation):
 
274
                request = HttpRequest()
 
275
                request.META = {
 
276
                    'HTTP_HOST': host,
 
277
                }
 
278
                request.get_host()
 
279
 
 
280
 
 
281
    @override_settings(DEBUG=True, ALLOWED_HOSTS=[])
 
282
    def test_host_validation_disabled_in_debug_mode(self):
 
283
        """If ALLOWED_HOSTS is empty and DEBUG is True, all hosts pass."""
 
284
        request = HttpRequest()
 
285
        request.META = {
 
286
            'HTTP_HOST': 'example.com',
 
287
        }
 
288
        self.assertEqual(request.get_host(), 'example.com')
 
289
 
 
290
 
 
291
    @override_settings(ALLOWED_HOSTS=[])
 
292
    def test_get_host_suggestion_of_allowed_host(self):
 
293
        """get_host() makes helpful suggestions if a valid-looking host is not in ALLOWED_HOSTS."""
 
294
        msg_invalid_host = "Invalid HTTP_HOST header: %r."
 
295
        msg_suggestion = msg_invalid_host + "You may need to add %r to ALLOWED_HOSTS."
 
296
 
 
297
        for host in [ # Valid-looking hosts
 
298
            'example.com',
 
299
            '12.34.56.78',
 
300
            '[2001:19f0:feee::dead:beef:cafe]',
 
301
            'xn--4ca9at.com', # Punnycode for öäü.com
 
302
        ]:
 
303
            request = HttpRequest()
 
304
            request.META = {'HTTP_HOST': host}
 
305
            self.assertRaisesMessage(
 
306
                SuspiciousOperation,
 
307
                msg_suggestion % (host, host),
 
308
                request.get_host
 
309
            )
 
310
 
 
311
        for domain, port in [ # Valid-looking hosts with a port number
 
312
            ('example.com', 80),
 
313
            ('12.34.56.78', 443),
 
314
            ('[2001:19f0:feee::dead:beef:cafe]', 8080),
 
315
        ]:
 
316
            host = '%s:%s' % (domain, port)
 
317
            request = HttpRequest()
 
318
            request.META = {'HTTP_HOST': host}
 
319
            self.assertRaisesMessage(
 
320
                SuspiciousOperation,
 
321
                msg_suggestion % (host, domain),
 
322
                request.get_host
 
323
            )
 
324
 
 
325
        for host in [ # Invalid hosts
 
326
            'example.com@evil.tld',
 
327
            'example.com:dr.frankenstein@evil.tld',
 
328
            'example.com:dr.frankenstein@evil.tld:80',
 
329
            'example.com:80/badpath',
 
330
            'example.com: recovermypassword.com',
 
331
        ]:
 
332
            request = HttpRequest()
 
333
            request.META = {'HTTP_HOST': host}
 
334
            self.assertRaisesMessage(
 
335
                SuspiciousOperation,
 
336
                msg_invalid_host % host,
 
337
                request.get_host
 
338
            )
 
339
 
 
340
 
 
341
    def test_near_expiration(self):
 
342
        "Cookie will expire when an near expiration time is provided"
 
343
        response = HttpResponse()
 
344
        # There is a timing weakness in this test; The
 
345
        # expected result for max-age requires that there be
 
346
        # a very slight difference between the evaluated expiration
 
347
        # time, and the time evaluated in set_cookie(). If this
 
348
        # difference doesn't exist, the cookie time will be
 
349
        # 1 second larger. To avoid the problem, put in a quick sleep,
 
350
        # which guarantees that there will be a time difference.
 
351
        expires = datetime.utcnow() + timedelta(seconds=10)
 
352
        time.sleep(0.001)
 
353
        response.set_cookie('datetime', expires=expires)
 
354
        datetime_cookie = response.cookies['datetime']
 
355
        self.assertEqual(datetime_cookie['max-age'], 10)
 
356
 
 
357
    def test_aware_expiration(self):
 
358
        "Cookie accepts an aware datetime as expiration time"
 
359
        response = HttpResponse()
 
360
        expires = (datetime.utcnow() + timedelta(seconds=10)).replace(tzinfo=utc)
 
361
        time.sleep(0.001)
 
362
        response.set_cookie('datetime', expires=expires)
 
363
        datetime_cookie = response.cookies['datetime']
 
364
        self.assertEqual(datetime_cookie['max-age'], 10)
 
365
 
 
366
    def test_far_expiration(self):
 
367
        "Cookie will expire when an distant expiration time is provided"
 
368
        response = HttpResponse()
 
369
        response.set_cookie('datetime', expires=datetime(2028, 1, 1, 4, 5, 6))
 
370
        datetime_cookie = response.cookies['datetime']
 
371
        self.assertEqual(datetime_cookie['expires'], 'Sat, 01-Jan-2028 04:05:06 GMT')
 
372
 
 
373
    def test_max_age_expiration(self):
 
374
        "Cookie will expire if max_age is provided"
 
375
        response = HttpResponse()
 
376
        response.set_cookie('max_age', max_age=10)
 
377
        max_age_cookie = response.cookies['max_age']
 
378
        self.assertEqual(max_age_cookie['max-age'], 10)
 
379
        self.assertEqual(max_age_cookie['expires'], cookie_date(time.time()+10))
 
380
 
 
381
    def test_httponly_cookie(self):
 
382
        response = HttpResponse()
 
383
        response.set_cookie('example', httponly=True)
 
384
        example_cookie = response.cookies['example']
 
385
        # A compat cookie may be in use -- check that it has worked
 
386
        # both as an output string, and using the cookie attributes
 
387
        self.assertTrue('; httponly' in str(example_cookie))
 
388
        self.assertTrue(example_cookie['httponly'])
 
389
 
 
390
    def test_limited_stream(self):
 
391
        # Read all of a limited stream
 
392
        stream = LimitedStream(BytesIO(b'test'), 2)
 
393
        self.assertEqual(stream.read(), b'te')
 
394
        # Reading again returns nothing.
 
395
        self.assertEqual(stream.read(), b'')
 
396
 
 
397
        # Read a number of characters greater than the stream has to offer
 
398
        stream = LimitedStream(BytesIO(b'test'), 2)
 
399
        self.assertEqual(stream.read(5), b'te')
 
400
        # Reading again returns nothing.
 
401
        self.assertEqual(stream.readline(5), b'')
 
402
 
 
403
        # Read sequentially from a stream
 
404
        stream = LimitedStream(BytesIO(b'12345678'), 8)
 
405
        self.assertEqual(stream.read(5), b'12345')
 
406
        self.assertEqual(stream.read(5), b'678')
 
407
        # Reading again returns nothing.
 
408
        self.assertEqual(stream.readline(5), b'')
 
409
 
 
410
        # Read lines from a stream
 
411
        stream = LimitedStream(BytesIO(b'1234\n5678\nabcd\nefgh\nijkl'), 24)
 
412
        # Read a full line, unconditionally
 
413
        self.assertEqual(stream.readline(), b'1234\n')
 
414
        # Read a number of characters less than a line
 
415
        self.assertEqual(stream.readline(2), b'56')
 
416
        # Read the rest of the partial line
 
417
        self.assertEqual(stream.readline(), b'78\n')
 
418
        # Read a full line, with a character limit greater than the line length
 
419
        self.assertEqual(stream.readline(6), b'abcd\n')
 
420
        # Read the next line, deliberately terminated at the line end
 
421
        self.assertEqual(stream.readline(4), b'efgh')
 
422
        # Read the next line... just the line end
 
423
        self.assertEqual(stream.readline(), b'\n')
 
424
        # Read everything else.
 
425
        self.assertEqual(stream.readline(), b'ijkl')
 
426
 
 
427
        # Regression for #15018
 
428
        # If a stream contains a newline, but the provided length
 
429
        # is less than the number of provided characters, the newline
 
430
        # doesn't reset the available character count
 
431
        stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9)
 
432
        self.assertEqual(stream.readline(10), b'1234\n')
 
433
        self.assertEqual(stream.readline(3), b'abc')
 
434
        # Now expire the available characters
 
435
        self.assertEqual(stream.readline(3), b'd')
 
436
        # Reading again returns nothing.
 
437
        self.assertEqual(stream.readline(2), b'')
 
438
 
 
439
        # Same test, but with read, not readline.
 
440
        stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9)
 
441
        self.assertEqual(stream.read(6), b'1234\na')
 
442
        self.assertEqual(stream.read(2), b'bc')
 
443
        self.assertEqual(stream.read(2), b'd')
 
444
        self.assertEqual(stream.read(2), b'')
 
445
        self.assertEqual(stream.read(), b'')
 
446
 
 
447
    def test_stream(self):
 
448
        payload = FakePayload('name=value')
 
449
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
450
                               'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
451
                               'CONTENT_LENGTH': len(payload),
 
452
                               'wsgi.input': payload})
 
453
        self.assertEqual(request.read(), b'name=value')
 
454
 
 
455
    def test_read_after_value(self):
 
456
        """
 
457
        Reading from request is allowed after accessing request contents as
 
458
        POST or body.
 
459
        """
 
460
        payload = FakePayload('name=value')
 
461
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
462
                               'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
463
                               'CONTENT_LENGTH': len(payload),
 
464
                               'wsgi.input': payload})
 
465
        self.assertEqual(request.POST, {'name': ['value']})
 
466
        self.assertEqual(request.body, b'name=value')
 
467
        self.assertEqual(request.read(), b'name=value')
 
468
 
 
469
    def test_value_after_read(self):
 
470
        """
 
471
        Construction of POST or body is not allowed after reading
 
472
        from request.
 
473
        """
 
474
        payload = FakePayload('name=value')
 
475
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
476
                               'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
477
                               'CONTENT_LENGTH': len(payload),
 
478
                               'wsgi.input': payload})
 
479
        self.assertEqual(request.read(2), b'na')
 
480
        self.assertRaises(Exception, lambda: request.body)
 
481
        self.assertEqual(request.POST, {})
 
482
 
 
483
    def test_non_ascii_POST(self):
 
484
        payload = FakePayload(urlencode({'key': 'España'}))
 
485
        request = WSGIRequest({
 
486
            'REQUEST_METHOD': 'POST',
 
487
            'CONTENT_LENGTH': len(payload),
 
488
            'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
489
            'wsgi.input': payload,
 
490
        })
 
491
        self.assertEqual(request.POST, {'key': ['España']})
 
492
 
 
493
    def test_alternate_charset_POST(self):
 
494
        """
 
495
        Test a POST with non-utf-8 payload encoding.
 
496
        """
 
497
        payload = FakePayload(original_urlencode({'key': 'España'.encode('latin-1')}))
 
498
        request = WSGIRequest({
 
499
            'REQUEST_METHOD': 'POST',
 
500
            'CONTENT_LENGTH': len(payload),
 
501
            'CONTENT_TYPE': 'application/x-www-form-urlencoded; charset=iso-8859-1',
 
502
            'wsgi.input': payload,
 
503
        })
 
504
        self.assertEqual(request.POST, {'key': ['España']})
 
505
 
 
506
    def test_body_after_POST_multipart_form_data(self):
 
507
        """
 
508
        Reading body after parsing multipart/form-data is not allowed
 
509
        """
 
510
        # Because multipart is used for large amounts fo data i.e. file uploads,
 
511
        # we don't want the data held in memory twice, and we don't want to
 
512
        # silence the error by setting body = '' either.
 
513
        payload = FakePayload("\r\n".join([
 
514
                '--boundary',
 
515
                'Content-Disposition: form-data; name="name"',
 
516
                '',
 
517
                'value',
 
518
                '--boundary--'
 
519
                '']))
 
520
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
521
                               'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
 
522
                               'CONTENT_LENGTH': len(payload),
 
523
                               'wsgi.input': payload})
 
524
        self.assertEqual(request.POST, {'name': ['value']})
 
525
        self.assertRaises(Exception, lambda: request.body)
 
526
 
 
527
    def test_body_after_POST_multipart_related(self):
 
528
        """
 
529
        Reading body after parsing multipart that isn't form-data is allowed
 
530
        """
 
531
        # Ticket #9054
 
532
        # There are cases in which the multipart data is related instead of
 
533
        # being a binary upload, in which case it should still be accessible
 
534
        # via body.
 
535
        payload_data = b"\r\n".join([
 
536
                b'--boundary',
 
537
                b'Content-ID: id; name="name"',
 
538
                b'',
 
539
                b'value',
 
540
                b'--boundary--'
 
541
                b''])
 
542
        payload = FakePayload(payload_data)
 
543
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
544
                               'CONTENT_TYPE': 'multipart/related; boundary=boundary',
 
545
                               'CONTENT_LENGTH': len(payload),
 
546
                               'wsgi.input': payload})
 
547
        self.assertEqual(request.POST, {})
 
548
        self.assertEqual(request.body, payload_data)
 
549
 
 
550
    def test_POST_multipart_with_content_length_zero(self):
 
551
        """
 
552
        Multipart POST requests with Content-Length >= 0 are valid and need to be handled.
 
553
        """
 
554
        # According to:
 
555
        # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.13
 
556
        # Every request.POST with Content-Length >= 0 is a valid request,
 
557
        # this test ensures that we handle Content-Length == 0.
 
558
        payload = FakePayload("\r\n".join([
 
559
                '--boundary',
 
560
                'Content-Disposition: form-data; name="name"',
 
561
                '',
 
562
                'value',
 
563
                '--boundary--'
 
564
                '']))
 
565
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
566
                               'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
 
567
                               'CONTENT_LENGTH': 0,
 
568
                               'wsgi.input': payload})
 
569
        self.assertEqual(request.POST, {})
 
570
 
 
571
    def test_POST_binary_only(self):
 
572
        payload = b'\r\n\x01\x00\x00\x00ab\x00\x00\xcd\xcc,@'
 
573
        environ = {'REQUEST_METHOD': 'POST',
 
574
                   'CONTENT_TYPE': 'application/octet-stream',
 
575
                   'CONTENT_LENGTH': len(payload),
 
576
                   'wsgi.input': BytesIO(payload)}
 
577
        request = WSGIRequest(environ)
 
578
        self.assertEqual(request.POST, {})
 
579
        self.assertEqual(request.FILES, {})
 
580
        self.assertEqual(request.body, payload)
 
581
 
 
582
        # Same test without specifying content-type
 
583
        environ.update({'CONTENT_TYPE': '', 'wsgi.input': BytesIO(payload)})
 
584
        request = WSGIRequest(environ)
 
585
        self.assertEqual(request.POST, {})
 
586
        self.assertEqual(request.FILES, {})
 
587
        self.assertEqual(request.body, payload)
 
588
 
 
589
    def test_read_by_lines(self):
 
590
        payload = FakePayload('name=value')
 
591
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
592
                               'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
593
                               'CONTENT_LENGTH': len(payload),
 
594
                               'wsgi.input': payload})
 
595
        self.assertEqual(list(request), [b'name=value'])
 
596
 
 
597
    def test_POST_after_body_read(self):
 
598
        """
 
599
        POST should be populated even if body is read first
 
600
        """
 
601
        payload = FakePayload('name=value')
 
602
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
603
                               'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
604
                               'CONTENT_LENGTH': len(payload),
 
605
                               'wsgi.input': payload})
 
606
        raw_data = request.body
 
607
        self.assertEqual(request.POST, {'name': ['value']})
 
608
 
 
609
    def test_POST_after_body_read_and_stream_read(self):
 
610
        """
 
611
        POST should be populated even if body is read first, and then
 
612
        the stream is read second.
 
613
        """
 
614
        payload = FakePayload('name=value')
 
615
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
616
                               'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
617
                               'CONTENT_LENGTH': len(payload),
 
618
                               'wsgi.input': payload})
 
619
        raw_data = request.body
 
620
        self.assertEqual(request.read(1), b'n')
 
621
        self.assertEqual(request.POST, {'name': ['value']})
 
622
 
 
623
    def test_POST_after_body_read_and_stream_read_multipart(self):
 
624
        """
 
625
        POST should be populated even if body is read first, and then
 
626
        the stream is read second. Using multipart/form-data instead of urlencoded.
 
627
        """
 
628
        payload = FakePayload("\r\n".join([
 
629
                '--boundary',
 
630
                'Content-Disposition: form-data; name="name"',
 
631
                '',
 
632
                'value',
 
633
                '--boundary--'
 
634
                '']))
 
635
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
636
                               'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
 
637
                               'CONTENT_LENGTH': len(payload),
 
638
                               'wsgi.input': payload})
 
639
        raw_data = request.body
 
640
        # Consume enough data to mess up the parsing:
 
641
        self.assertEqual(request.read(13), b'--boundary\r\nC')
 
642
        self.assertEqual(request.POST, {'name': ['value']})
 
643
 
 
644
    def test_POST_connection_error(self):
 
645
        """
 
646
        If wsgi.input.read() raises an exception while trying to read() the
 
647
        POST, the exception should be identifiable (not a generic IOError).
 
648
        """
 
649
        class ExplodingBytesIO(BytesIO):
 
650
            def read(self, len=0):
 
651
                raise IOError("kaboom!")
 
652
 
 
653
        payload = b'name=value'
 
654
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
655
                               'CONTENT_TYPE': 'application/x-www-form-urlencoded',
 
656
                               'CONTENT_LENGTH': len(payload),
 
657
                               'wsgi.input': ExplodingBytesIO(payload)})
 
658
 
 
659
        with self.assertRaises(UnreadablePostError):
 
660
            request.body
 
661
 
 
662
    def test_FILES_connection_error(self):
 
663
        """
 
664
        If wsgi.input.read() raises an exception while trying to read() the
 
665
        FILES, the exception should be identifiable (not a generic IOError).
 
666
        """
 
667
        class ExplodingBytesIO(BytesIO):
 
668
            def read(self, len=0):
 
669
                raise IOError("kaboom!")
 
670
 
 
671
        payload = b'x'
 
672
        request = WSGIRequest({'REQUEST_METHOD': 'POST',
 
673
                               'CONTENT_TYPE': 'multipart/form-data; boundary=foo_',
 
674
                               'CONTENT_LENGTH': len(payload),
 
675
                               'wsgi.input': ExplodingBytesIO(payload)})
 
676
 
 
677
        with self.assertRaises(UnreadablePostError):
 
678
            request.FILES
 
679
 
 
680
 
 
681
@skipIf(connection.vendor == 'sqlite'
 
682
        and connection.settings_dict['TEST_NAME'] in (None, '', ':memory:'),
 
683
        "Cannot establish two connections to an in-memory SQLite database.")
 
684
class DatabaseConnectionHandlingTests(TransactionTestCase):
 
685
 
 
686
    available_apps = []
 
687
 
 
688
    def setUp(self):
 
689
        # Use a temporary connection to avoid messing with the main one.
 
690
        self._old_default_connection = connections['default']
 
691
        del connections['default']
 
692
 
 
693
    def tearDown(self):
 
694
        try:
 
695
            connections['default'].close()
 
696
        finally:
 
697
            connections['default'] = self._old_default_connection
 
698
 
 
699
    def test_request_finished_db_state(self):
 
700
        # Force closing connection on request end
 
701
        connection.settings_dict['CONN_MAX_AGE'] = 0
 
702
 
 
703
        # The GET below will not succeed, but it will give a response with
 
704
        # defined ._handler_class. That is needed for sending the
 
705
        # request_finished signal.
 
706
        response = self.client.get('/')
 
707
        # Make sure there is an open connection
 
708
        connection.cursor()
 
709
        connection.enter_transaction_management()
 
710
        signals.request_finished.send(sender=response._handler_class)
 
711
        self.assertEqual(len(connection.transaction_state), 0)
 
712
 
 
713
    def test_request_finished_failed_connection(self):
 
714
        # Force closing connection on request end
 
715
        connection.settings_dict['CONN_MAX_AGE'] = 0
 
716
 
 
717
        connection.enter_transaction_management()
 
718
        connection.set_dirty()
 
719
        # Test that the rollback doesn't succeed (for example network failure
 
720
        # could cause this).
 
721
        def fail_horribly():
 
722
            raise Exception("Horrible failure!")
 
723
        connection._rollback = fail_horribly
 
724
        try:
 
725
            with self.assertRaises(Exception):
 
726
                signals.request_finished.send(sender=self.__class__)
 
727
            # The connection's state wasn't cleaned up
 
728
            self.assertEqual(len(connection.transaction_state), 1)
 
729
        finally:
 
730
            del connection._rollback
 
731
        # The connection will be cleaned on next request where the conn
 
732
        # works again.
 
733
        signals.request_finished.send(sender=self.__class__)
 
734
        self.assertEqual(len(connection.transaction_state), 0)