~ubuntu-branches/ubuntu/trusty/python-webob/trusty-proposed

« back to all changes in this revision

Viewing changes to tests/test_response.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2013-01-07 07:52:32 UTC
  • mfrom: (1.3.5)
  • Revision ID: package-import@ubuntu.com-20130107075232-w6x8r94du3t48wj4
Tags: 1.2.3-0ubuntu1
* New upstream release:
  - Dropped debian/patches/01_lp_920197.patch: no longer needed.
  - debian/watch: Update to point to pypi.
  - debian/rules: Disable docs build.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import sys
2
1
import zlib
3
 
if sys.version >= '2.7':
4
 
    from io import BytesIO as StringIO
5
 
else:
6
 
    from cStringIO import StringIO
7
 
try:
8
 
    from hashlib import md5
9
 
except ImportError:
10
 
    from md5 import md5
 
2
import io
11
3
 
12
4
from nose.tools import eq_, ok_, assert_raises
13
5
 
14
 
from webob import BaseRequest, Request, Response
 
6
from webob.request import BaseRequest
 
7
from webob.request import Request
 
8
from webob.response import Response
 
9
from webob.compat import text_
 
10
from webob.compat import bytes_
15
11
 
16
12
def simple_app(environ, start_response):
17
13
    start_response('200 OK', [
23
19
    req = BaseRequest.blank('/')
24
20
    res = req.get_response(simple_app)
25
21
    assert res.status == '200 OK'
26
 
    assert res.status_int == 200
 
22
    assert res.status_code == 200
27
23
    assert res.body == "OK"
28
24
    assert res.charset == 'utf8'
29
25
    assert res.content_type == 'text/html'
30
26
    res.status = 404
31
27
    assert res.status == '404 Not Found'
32
 
    assert res.status_int == 404
33
 
    res.body = 'Not OK'
34
 
    assert ''.join(res.app_iter) == 'Not OK'
 
28
    assert res.status_code == 404
 
29
    res.body = b'Not OK'
 
30
    assert b''.join(res.app_iter) == b'Not OK'
35
31
    res.charset = 'iso8859-1'
36
32
    assert res.headers['content-type'] == 'text/html; charset=iso8859-1'
37
33
    res.content_type = 'text/xml'
41
37
    assert res.headerlist == [('content-type', 'text/html')]
42
38
    res.set_cookie('x', 'y')
43
39
    assert res.headers['set-cookie'].strip(';') == 'x=y; Path=/'
 
40
    res.set_cookie(text_('x'), text_('y'))
 
41
    assert res.headers['set-cookie'].strip(';') == 'x=y; Path=/'
44
42
    res = Response('a body', '200 OK', content_type='text/html')
45
43
    res.encode_content()
46
44
    assert res.content_encoding == 'gzip'
47
 
    eq_(res.body, '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffKTH\xcaO\xa9\x04\x00\xf6\x86GI\x06\x00\x00\x00')
 
45
    eq_(res.body, b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffKTH\xcaO\xa9\x04\x00\xf6\x86GI\x06\x00\x00\x00')
48
46
    res.decode_content()
49
47
    assert res.content_encoding is None
50
 
    assert res.body == 'a body'
51
 
    res.set_cookie('x', u'foo') # test unicode value
 
48
    assert res.body == b'a body'
 
49
    res.set_cookie('x', text_(b'foo')) # test unicode value
52
50
    assert_raises(TypeError, Response, app_iter=iter(['a']),
53
51
                  body="somebody")
54
52
    del req.environ
55
 
    eq_(Response(request=req)._environ, req)
56
 
    eq_(Response(request=req)._request, None)
57
53
    assert_raises(TypeError, Response, charset=None,
58
 
                  body=u"unicode body")
 
54
                  body=text_(b"unicode body"))
59
55
    assert_raises(TypeError, Response, wrong_key='dummy')
60
56
 
 
57
def test_set_response_status_binary():
 
58
    req = BaseRequest.blank('/')
 
59
    res = req.get_response(simple_app)
 
60
    res.status == b'200 OK'
 
61
    assert res.status_code == 200
 
62
    assert res.status == '200 OK'
 
63
 
 
64
def test_set_response_status_str_no_reason():
 
65
    req = BaseRequest.blank('/')
 
66
    res = req.get_response(simple_app)
 
67
    res.status = '200'
 
68
    assert res.status_code == 200
 
69
    assert res.status == '200 OK'
 
70
 
 
71
def test_set_response_status_str_generic_reason():
 
72
    req = BaseRequest.blank('/')
 
73
    res = req.get_response(simple_app)
 
74
    res.status = '299'
 
75
    assert res.status_code == 299
 
76
    assert res.status == '299 Success'
 
77
 
 
78
def test_set_response_status_code():
 
79
    req = BaseRequest.blank('/')
 
80
    res = req.get_response(simple_app)
 
81
    res.status_code = 200
 
82
    assert res.status_code == 200
 
83
    assert res.status == '200 OK'
 
84
 
 
85
def test_set_response_status_code_generic_reason():
 
86
    req = BaseRequest.blank('/')
 
87
    res = req.get_response(simple_app)
 
88
    res.status_code = 299
 
89
    assert res.status_code == 299
 
90
    assert res.status == '299 Success'
 
91
 
61
92
def test_content_type():
62
93
    r = Response()
63
94
    # default ctype and charset
71
102
    r.content_type = None
72
103
    eq_(r.content_type, None)
73
104
 
 
105
def test_init_content_type_w_charset():
 
106
    v = 'text/plain;charset=ISO-8859-1'
 
107
    eq_(Response(content_type=v).headers['content-type'], v)
 
108
 
 
109
 
74
110
def test_cookies():
75
111
    res = Response()
76
 
    res.set_cookie('x', u'\N{BLACK SQUARE}') # test unicode value
77
 
    eq_(res.headers.getall('set-cookie'), ['x="\\342\\226\\240"; Path=/']) # uft8 encoded
 
112
    # test unicode value
 
113
    res.set_cookie('x', text_(b'\N{BLACK SQUARE}', 'unicode_escape'))
 
114
    # utf8 encoded
 
115
    eq_(res.headers.getall('set-cookie'), ['x="\\342\\226\\240"; Path=/'])
78
116
    r2 = res.merge_cookies(simple_app)
79
117
    r2 = BaseRequest.blank('/').get_response(r2)
80
118
    eq_(r2.headerlist,
94
132
    tval = 'application/x-test'
95
133
    r.headers.update({'content-type': tval})
96
134
    eq_(r.headers.getall('content-type'), [tval])
 
135
    r.headers.clear()
 
136
    assert not r.headerlist
97
137
 
98
138
def test_response_copy():
99
139
    r = Response(app_iter=iter(['a']))
113
153
def test_HEAD_closes():
114
154
    req = Request.blank('/')
115
155
    req.method = 'HEAD'
116
 
    app_iter = StringIO('foo')
 
156
    app_iter = io.BytesIO(b'foo')
117
157
    res = req.get_response(Response(app_iter=app_iter))
118
 
    eq_(res.status_int, 200)
119
 
    eq_(res.body, '')
 
158
    eq_(res.status_code, 200)
 
159
    eq_(res.body, b'')
120
160
    ok_(app_iter.closed)
121
161
 
122
162
def test_HEAD_conditional_response_returns_empty_response():
123
 
    from webob.response import EmptyResponse
124
 
    req = Request.blank('/')
125
 
    req.method = 'HEAD'
126
 
    res = Response(request=req, conditional_response=True)
127
 
    class FakeRequest:
128
 
        method = 'HEAD'
129
 
        if_none_match = 'none'
130
 
        if_modified_since = False
131
 
        range = False
132
 
        def __init__(self, env):
133
 
            self.env = env
 
163
    req = Request.blank('/',
 
164
        method='HEAD',
 
165
        if_none_match='none'
 
166
    )
 
167
    res = Response(conditional_response=True)
134
168
    def start_response(status, headerlist):
135
169
        pass
136
 
    res.RequestClass = FakeRequest
137
 
    result = res({}, start_response)
138
 
    ok_(isinstance(result, EmptyResponse))
 
170
    result = res(req.environ, start_response)
 
171
    assert not list(result)
139
172
 
140
173
def test_HEAD_conditional_response_range_empty_response():
141
 
    from webob.response import EmptyResponse
142
 
    req = Request.blank('/')
143
 
    req.method = 'HEAD'
144
 
    res = Response(request=req, conditional_response=True)
145
 
    res.status_int = 200
146
 
    res.body = 'Are we not men?'
147
 
    res.content_length = len(res.body)
148
 
    class FakeRequest:
149
 
        method = 'HEAD'
150
 
        if_none_match = 'none'
151
 
        if_modified_since = False
152
 
        def __init__(self, env):
153
 
            self.env = env
154
 
            self.range = self # simulate inner api
155
 
            self.if_range = self
156
 
        def content_range(self, length):
157
 
            """range attr"""
158
 
            class Range:
159
 
                start = 4
160
 
                stop = 5
161
 
            return Range
162
 
        def match_response(self, res):
163
 
            """if_range_match attr"""
164
 
            return True
165
 
    def start_response(status, headerlist):
166
 
        pass
167
 
    res.RequestClass = FakeRequest
168
 
    result = res({}, start_response)
169
 
    ok_(isinstance(result, EmptyResponse), result)
 
174
    req = Request.blank('/',
 
175
        method = 'HEAD',
 
176
        range=(4,5),
 
177
    )
 
178
    res = Response('Are we not men?', conditional_response=True)
 
179
    assert req.get_response(res).body == b''
 
180
 
170
181
 
171
182
def test_conditional_response_if_none_match_false():
172
183
    req = Request.blank('/', if_none_match='foo')
173
184
    resp = Response(app_iter=['foo\n'],
 
185
            conditional_response=True, etag='bar')
 
186
    resp = req.get_response(resp)
 
187
    eq_(resp.status_code, 200)
 
188
 
 
189
def test_conditional_response_if_none_match_true():
 
190
    req = Request.blank('/', if_none_match='foo')
 
191
    resp = Response(app_iter=['foo\n'],
174
192
            conditional_response=True, etag='foo')
175
193
    resp = req.get_response(resp)
176
 
    eq_(resp.status_int, 304)
177
 
 
178
 
def test_conditional_response_if_none_match_true():
179
 
    req = Request.blank('/', if_none_match='foo')
180
 
    resp = Response(app_iter=['foo\n'],
181
 
            conditional_response=True, etag='bar')
182
 
    resp = req.get_response(resp)
183
 
    eq_(resp.status_int, 200)
 
194
    eq_(resp.status_code, 304)
 
195
 
 
196
def test_conditional_response_if_none_match_weak():
 
197
    req = Request.blank('/', headers={'if-none-match': '"bar"'})
 
198
    req_weak = Request.blank('/', headers={'if-none-match': 'W/"bar"'})
 
199
    resp = Response(app_iter=['foo\n'], conditional_response=True, etag='bar')
 
200
    resp_weak = Response(app_iter=['foo\n'], conditional_response=True, headers={'etag': 'W/"bar"'})
 
201
    for rq in [req, req_weak]:
 
202
        for rp in [resp, resp_weak]:
 
203
            rq.get_response(rp).status_code == 304
 
204
 
 
205
    r2 = Response(app_iter=['foo\n'], conditional_response=True, headers={'etag': '"foo"'})
 
206
    r2_weak = Response(app_iter=['foo\n'], conditional_response=True, headers={'etag': 'W/"foo"'})
 
207
    req_weak.get_response(r2).status_code == 200
 
208
    req.get_response(r2_weak) == 200
 
209
 
184
210
 
185
211
def test_conditional_response_if_modified_since_false():
186
212
    from datetime import datetime, timedelta
188
214
    resp = Response(app_iter=['foo\n'], conditional_response=True,
189
215
            last_modified=req.if_modified_since-timedelta(seconds=1))
190
216
    resp = req.get_response(resp)
191
 
    eq_(resp.status_int, 304)
 
217
    eq_(resp.status_code, 304)
192
218
 
193
219
def test_conditional_response_if_modified_since_true():
194
220
    from datetime import datetime, timedelta
196
222
    resp = Response(app_iter=['foo\n'], conditional_response=True,
197
223
            last_modified=req.if_modified_since+timedelta(seconds=1))
198
224
    resp = req.get_response(resp)
199
 
    eq_(resp.status_int, 200)
 
225
    eq_(resp.status_code, 200)
200
226
 
201
227
def test_conditional_response_range_not_satisfiable_response():
202
228
    req = Request.blank('/', range='bytes=100-200')
203
229
    resp = Response(app_iter=['foo\n'], content_length=4,
204
230
            conditional_response=True)
205
231
    resp = req.get_response(resp)
206
 
    eq_(resp.status_int, 416)
 
232
    eq_(resp.status_code, 416)
207
233
    eq_(resp.content_range.start, None)
208
234
    eq_(resp.content_range.stop, None)
209
235
    eq_(resp.content_range.length, 4)
210
 
    eq_(resp.body, 'Requested range not satisfiable: bytes=100-200')
 
236
    eq_(resp.body, b'Requested range not satisfiable: bytes=100-200')
211
237
 
212
238
def test_HEAD_conditional_response_range_not_satisfiable_response():
213
239
    req = Request.blank('/', method='HEAD', range='bytes=100-200')
214
240
    resp = Response(app_iter=['foo\n'], content_length=4,
215
241
            conditional_response=True)
216
242
    resp = req.get_response(resp)
217
 
    eq_(resp.status_int, 416)
 
243
    eq_(resp.status_code, 416)
218
244
    eq_(resp.content_range.start, None)
219
245
    eq_(resp.content_range.stop, None)
220
246
    eq_(resp.content_range.length, 4)
221
 
    eq_(resp.body, '')
222
 
 
223
 
def test_del_environ():
224
 
    res = Response()
225
 
    res.environ = {'yo': 'mama'}
226
 
    eq_(res.environ, {'yo': 'mama'})
227
 
    del res.environ
228
 
    eq_(res.environ, None)
229
 
    eq_(res.request, None)
230
 
 
231
 
def test_set_request_environ():
232
 
    res = Response()
233
 
    class FakeRequest:
234
 
        environ = {'jo': 'mama'}
235
 
    res.request = FakeRequest
236
 
    eq_(res.environ, {'jo': 'mama'})
237
 
    eq_(res.request, FakeRequest)
238
 
    res.environ = None
239
 
    eq_(res.environ, None)
240
 
    eq_(res.request, None)
241
 
 
242
 
def test_del_request():
243
 
    res = Response()
244
 
    class FakeRequest:
245
 
        environ = {}
246
 
    res.request = FakeRequest
247
 
    del res.request
248
 
    eq_(res.environ, None)
249
 
    eq_(res.request, None)
250
 
 
251
 
def test_set_environ_via_request_subterfuge():
252
 
    class FakeRequest:
253
 
        def __init__(self, env):
254
 
            self.environ = env
255
 
    res = Response()
256
 
    res.RequestClass = FakeRequest
257
 
    res.request = {'action': 'dwim'}
258
 
    eq_(res.environ, {'action': 'dwim'})
259
 
    ok_(isinstance(res.request, FakeRequest))
260
 
    eq_(res.request.environ, res.environ)
261
 
 
262
 
def test_set_request():
263
 
    res = Response()
264
 
    class FakeRequest:
265
 
        environ = {'foo': 'bar'}
266
 
    res.request = FakeRequest
267
 
    eq_(res.request, FakeRequest)
268
 
    eq_(res.environ, FakeRequest.environ)
269
 
    res.request = None
270
 
    eq_(res.environ, None)
271
 
    eq_(res.request, None)
 
247
    eq_(resp.body, b'')
272
248
 
273
249
def test_md5_etag():
274
250
    res = Response()
275
 
    res.body = """\
 
251
    res.body = b"""\
276
252
In A.D. 2101
277
253
War was beginning.
278
254
Captain: What happen ?
294
270
    res.md5_etag()
295
271
    ok_(res.etag)
296
272
    ok_('\n' not in res.etag)
297
 
    eq_(res.etag,
298
 
        md5(res.body).digest().encode('base64').replace('\n', '').strip('='))
 
273
    eq_(res.etag, 'pN8sSTUrEaPRzmurGptqmw')
299
274
    eq_(res.content_md5, None)
300
275
 
301
276
def test_md5_etag_set_content_md5():
302
277
    res = Response()
303
 
    b = 'The quick brown fox jumps over the lazy dog'
304
 
    res.md5_etag(b, set_content_md5=True)
305
 
    ok_(res.content_md5,
306
 
        md5(b).digest().encode('base64').replace('\n', '').strip('='))
 
278
    body = b'The quick brown fox jumps over the lazy dog'
 
279
    res.md5_etag(body, set_content_md5=True)
 
280
    eq_(res.content_md5, 'nhB9nTcrtoJr2B01QqQZ1g==')
307
281
 
308
282
def test_decode_content_defaults_to_identity():
309
283
    res = Response()
310
 
    res.body = 'There be dragons'
 
284
    res.body = b'There be dragons'
311
285
    res.decode_content()
312
 
    eq_(res.body, 'There be dragons')
 
286
    eq_(res.body, b'There be dragons')
313
287
 
314
288
def test_decode_content_with_deflate():
315
289
    res = Response()
316
 
    b = 'Hey Hey Hey'
 
290
    body = b'Hey Hey Hey'
317
291
    # Simulate inflate by chopping the headers off
318
292
    # the gzip encoded data
319
 
    res.body = zlib.compress(b)[2:-4]
 
293
    res.body = zlib.compress(body)[2:-4]
320
294
    res.content_encoding = 'deflate'
321
295
    res.decode_content()
322
 
    eq_(res.body, b)
 
296
    eq_(res.body, body)
323
297
    eq_(res.content_encoding, None)
324
298
 
325
299
def test_content_length():
327
301
 
328
302
    req_head = Request.blank('/', method='HEAD')
329
303
    r1 = req_head.get_response(r0)
330
 
    eq_(r1.status_int, 200)
331
 
    eq_(r1.body, '')
 
304
    eq_(r1.status_code, 200)
 
305
    eq_(r1.body, b'')
332
306
    eq_(r1.content_length, 10)
333
307
 
334
308
    req_get = Request.blank('/')
335
309
    r2 = req_get.get_response(r0)
336
 
    eq_(r2.status_int, 200)
337
 
    eq_(r2.body, 'x'*10)
 
310
    eq_(r2.status_code, 200)
 
311
    eq_(r2.body, b'x'*10)
338
312
    eq_(r2.content_length, 10)
339
313
 
340
 
    r3 = Response(app_iter=['x']*10)
 
314
    r3 = Response(app_iter=[b'x']*10)
341
315
    eq_(r3.content_length, None)
342
 
    eq_(r3.body, 'x'*10)
 
316
    eq_(r3.body, b'x'*10)
343
317
    eq_(r3.content_length, 10)
344
318
 
345
 
    r4 = Response(app_iter=['x']*10, content_length=20) # wrong content_length
 
319
    r4 = Response(app_iter=[b'x']*10,
 
320
                  content_length=20) # wrong content_length
346
321
    eq_(r4.content_length, 20)
347
322
    assert_raises(AssertionError, lambda: r4.body)
348
323
 
349
324
    req_range = Request.blank('/', range=(0,5))
350
325
    r0.conditional_response = True
351
326
    r5 = req_range.get_response(r0)
352
 
    eq_(r5.status_int, 206)
353
 
    eq_(r5.body, 'xxxxx')
 
327
    eq_(r5.status_code, 206)
 
328
    eq_(r5.body, b'xxxxx')
354
329
    eq_(r5.content_length, 5)
355
330
 
356
331
def test_app_iter_range():
357
332
    req = Request.blank('/', range=(2,5))
358
333
    for app_iter in [
359
 
        ['012345'],
360
 
        ['0', '12345'],
361
 
        ['0', '1234', '5'],
362
 
        ['01', '2345'],
363
 
        ['01', '234', '5'],
364
 
        ['012', '34', '5'],
365
 
        ['012', '3', '4', '5'],
366
 
        ['012', '3', '45'],
367
 
        ['0', '12', '34', '5'],
368
 
        ['0', '12', '345'],
 
334
        [b'012345'],
 
335
        [b'0', b'12345'],
 
336
        [b'0', b'1234', b'5'],
 
337
        [b'01', b'2345'],
 
338
        [b'01', b'234', b'5'],
 
339
        [b'012', b'34', b'5'],
 
340
        [b'012', b'3', b'4', b'5'],
 
341
        [b'012', b'3', b'45'],
 
342
        [b'0', b'12', b'34', b'5'],
 
343
        [b'0', b'12', b'345'],
369
344
    ]:
370
345
        r = Response(
371
346
            app_iter=app_iter,
374
349
        )
375
350
        res = req.get_response(r)
376
351
        eq_(list(res.content_range), [2,5,6])
377
 
        eq_(res.body, '234', 'body=%r; app_iter=%r' % (res.body, app_iter))
 
352
        eq_(res.body, b'234', (res.body, app_iter))
378
353
 
379
354
def test_app_iter_range_inner_method():
380
355
    class FakeAppIter:
399
374
 
400
375
def test_from_file():
401
376
    res = Response('test')
402
 
    equal_resp(res)
403
 
    res = Response(app_iter=iter(['test ', 'body']),
404
 
                    content_type='text/plain')
405
 
    equal_resp(res)
406
 
 
407
 
def equal_resp(res):
408
 
    input_ = StringIO(str(res))
409
 
    res2 = Response.from_file(input_)
 
377
    inp = io.BytesIO(bytes_(str(res)))
 
378
    equal_resp(res, inp)
 
379
 
 
380
def test_from_file2():
 
381
    res = Response(app_iter=iter([b'test ', b'body']),
 
382
                    content_type='text/plain')
 
383
    inp = io.BytesIO(bytes_(str(res)))
 
384
    equal_resp(res, inp)
 
385
 
 
386
def test_from_text_file():
 
387
    res = Response('test')
 
388
    inp = io.StringIO(text_(str(res), 'utf-8'))
 
389
    equal_resp(res, inp)
 
390
    res = Response(app_iter=iter([b'test ', b'body']),
 
391
                    content_type='text/plain')
 
392
    inp = io.StringIO(text_(str(res), 'utf-8'))
 
393
    equal_resp(res, inp)
 
394
 
 
395
def equal_resp(res, inp):
 
396
    res2 = Response.from_file(inp)
410
397
    eq_(res.body, res2.body)
411
398
    eq_(res.headers, res2.headers)
412
399
 
413
400
def test_from_file_w_leading_space_in_header():
414
401
    # Make sure the removal of code dealing with leading spaces is safe
415
402
    res1 = Response()
416
 
    file_w_space = StringIO('200 OK\n\tContent-Type: text/html; charset=UTF-8')
 
403
    file_w_space = io.BytesIO(
 
404
        b'200 OK\n\tContent-Type: text/html; charset=UTF-8')
417
405
    res2 = Response.from_file(file_w_space)
418
406
    eq_(res1.headers, res2.headers)
419
407
 
420
408
def test_file_bad_header():
421
 
    file_w_bh = StringIO('200 OK\nBad Header')
 
409
    file_w_bh = io.BytesIO(b'200 OK\nBad Header')
422
410
    assert_raises(ValueError, Response.from_file, file_w_bh)
423
411
 
424
412
def test_set_status():
425
413
    res = Response()
426
 
    res.status = u"OK 200"
427
 
    eq_(res.status, "OK 200")
 
414
    res.status = "200"
 
415
    eq_(res.status, "200 OK")
428
416
    assert_raises(TypeError, setattr, res, 'status', float(200))
429
417
 
430
418
def test_set_headerlist():
443
431
    environ = {
444
432
        'wsgi.url_scheme': 'http',
445
433
        'HTTP_HOST': 'test.com',
446
 
        'SCRIPT_NAME': '/foobar',
447
434
    }
448
 
    eq_(_request_uri(environ), 'http://test.com/foobar')
 
435
    eq_(_request_uri(environ), 'http://test.com/')
449
436
 
450
437
def test_request_uri_https():
451
438
    from webob.response import _request_uri
463
450
    eq_(list(range), [])
464
451
 
465
452
def test_resp_write_app_iter_non_list():
466
 
    res = Response(app_iter=('a','b'))
 
453
    res = Response(app_iter=(b'a', b'b'))
467
454
    eq_(res.content_length, None)
468
 
    res.write('c')
469
 
    eq_(res.body, 'abc')
 
455
    res.write(b'c')
 
456
    eq_(res.body, b'abc')
470
457
    eq_(res.content_length, 3)
471
458
 
472
459
def test_response_file_body_writelines():
473
460
    from webob.response import ResponseBodyFile
474
 
    res = Response(app_iter=['foo'])
 
461
    res = Response(app_iter=[b'foo'])
475
462
    rbo = ResponseBodyFile(res)
476
463
    rbo.writelines(['bar', 'baz'])
477
 
    eq_(res.app_iter, ['foo', 'bar', 'baz'])
 
464
    eq_(res.app_iter, [b'foo', b'bar', b'baz'])
478
465
    rbo.flush() # noop
479
 
    eq_(res.app_iter, ['foo', 'bar', 'baz'])
 
466
    eq_(res.app_iter, [b'foo', b'bar', b'baz'])
480
467
 
481
468
def test_response_write_non_str():
482
469
    res = Response()
483
470
    assert_raises(TypeError, res.write, object())
484
471
 
485
472
def test_response_file_body_write_empty_app_iter():
486
 
    from webob.response import ResponseBodyFile
487
473
    res = Response('foo')
488
474
    res.write('baz')
489
 
    eq_(res.app_iter, ['foo', 'baz'])
 
475
    eq_(res.app_iter, [b'foo', b'baz'])
490
476
 
491
477
def test_response_file_body_write_empty_body():
492
478
    res = Response('')
493
479
    res.write('baz')
494
 
    eq_(res.app_iter, ['', 'baz'])
 
480
    eq_(res.app_iter, [b'', b'baz'])
495
481
 
496
482
def test_response_file_body_close_not_implemented():
497
483
    rbo = Response().body_file
510
496
    assert_raises(AttributeError, res.__getattribute__, 'body')
511
497
 
512
498
def test_body_get_is_unicode_notverylong():
513
 
    res = Response(app_iter=(u'foo',))
 
499
    res = Response(app_iter=(text_(b'foo'),))
514
500
    assert_raises(TypeError, res.__getattribute__, 'body')
515
501
 
516
502
def test_body_get_is_unicode():
517
 
    res = Response(app_iter=(['x'] * 51 + [u'x']))
 
503
    res = Response(app_iter=(['x'] * 51 + [text_(b'x')]))
518
504
    assert_raises(TypeError, res.__getattribute__, 'body')
519
505
 
520
506
def test_body_set_not_unicode_or_str():
523
509
 
524
510
def test_body_set_unicode():
525
511
    res = Response()
526
 
    assert_raises(TypeError, res.__setattr__, 'body', u'abc')
 
512
    assert_raises(TypeError, res.__setattr__, 'body', text_(b'abc'))
527
513
 
528
514
def test_body_set_under_body_doesnt_exist():
529
515
    res = Response('abc')
530
 
    eq_(res.body, 'abc')
 
516
    eq_(res.body, b'abc')
531
517
    eq_(res.content_length, 3)
532
518
 
533
519
def test_body_del():
534
520
    res = Response('123')
535
521
    del res.body
536
 
    eq_(res.body, '')
 
522
    eq_(res.body, b'')
537
523
    eq_(res.content_length, 0)
538
524
 
539
525
def test_text_get_no_charset():
543
529
def test_unicode_body():
544
530
    res = Response()
545
531
    res.charset = 'utf-8'
546
 
    bbody = 'La Pe\xc3\xb1a' # binary string
547
 
    ubody = unicode(bbody, 'utf-8') # unicode string
 
532
    bbody = b'La Pe\xc3\xb1a' # binary string
 
533
    ubody = text_(bbody, 'utf-8') # unicode string
548
534
    res.body = bbody
549
535
    eq_(res.unicode_body, ubody)
550
536
    res.ubody = ubody
551
537
    eq_(res.body, bbody)
552
538
    del res.ubody
553
 
    eq_(res.body, '')
 
539
    eq_(res.body, b'')
554
540
 
555
541
def test_text_get_decode():
556
542
    res = Response()
557
543
    res.charset = 'utf-8'
558
 
    res.body = 'La Pe\xc3\xb1a'
559
 
    eq_(res.text, unicode('La Pe\xc3\xb1a', 'utf-8'))
 
544
    res.body = b'La Pe\xc3\xb1a'
 
545
    eq_(res.text, text_(b'La Pe\xc3\xb1a', 'utf-8'))
560
546
 
561
547
def test_text_set_no_charset():
562
548
    res = Response()
567
553
    res = Response()
568
554
    res.charset = 'utf-8'
569
555
    assert_raises(TypeError, res.__setattr__, 'text',
570
 
                  'La Pe\xc3\xb1a')
 
556
                  b'La Pe\xc3\xb1a')
571
557
 
572
558
def test_text_del():
573
559
    res = Response('123')
574
560
    del res.text
575
 
    eq_(res.body, '')
 
561
    eq_(res.body, b'')
576
562
    eq_(res.content_length, 0)
577
563
 
578
564
def test_body_file_del():
579
565
    res = Response()
580
 
    res.body = '123'
 
566
    res.body = b'123'
581
567
    eq_(res.content_length, 3)
582
 
    eq_(res.app_iter, ['123'])
 
568
    eq_(res.app_iter, [b'123'])
583
569
    del res.body_file
584
 
    eq_(res.body, '')
 
570
    eq_(res.body, b'')
585
571
    eq_(res.content_length, 0)
586
572
 
587
573
def test_write_unicode():
588
574
    res = Response()
589
 
    res.text = unicode('La Pe\xc3\xb1a', 'utf-8')
590
 
    res.write(u'a')
591
 
    eq_(res.text, unicode('La Pe\xc3\xb1aa', 'utf-8'))
 
575
    res.text = text_(b'La Pe\xc3\xb1a', 'utf-8')
 
576
    res.write(text_(b'a'))
 
577
    eq_(res.text, text_(b'La Pe\xc3\xb1aa', 'utf-8'))
592
578
 
593
579
def test_write_unicode_no_charset():
594
580
    res = Response(charset=None)
595
 
    assert_raises(TypeError, res.write, u'a')
 
581
    assert_raises(TypeError, res.write, text_(b'a'))
596
582
 
597
583
def test_write_text():
598
584
    res = Response()
599
 
    res.body = 'abc'
600
 
    res.write(u'a')
 
585
    res.body = b'abc'
 
586
    res.write(text_(b'a'))
601
587
    eq_(res.text, 'abca')
602
588
 
603
589
def test_app_iter_del():
606
592
        app_iter=['123'],
607
593
    )
608
594
    del res.app_iter
609
 
    eq_(res.body, '')
 
595
    eq_(res.body, b'')
610
596
    eq_(res.content_length, None)
611
597
 
612
598
def test_charset_set_no_content_type_header():
699
685
 
700
686
def test_set_cookie_value_is_unicode():
701
687
    res = Response()
702
 
    val = unicode('La Pe\xc3\xb1a', 'utf-8')
 
688
    val = text_(b'La Pe\xc3\xb1a', 'utf-8')
703
689
    res.set_cookie('a', val)
704
 
    eq_(res.headerlist[-1], (r'Set-Cookie', 'a="La Pe\\303\\261a"; Path=/'))
 
690
    eq_(res.headerlist[-1], ('Set-Cookie', 'a="La Pe\\303\\261a"; Path=/'))
705
691
 
706
692
def test_delete_cookie():
707
693
    res = Response()
745
731
 
746
732
def test_unset_cookie_not_existing_and_not_strict():
747
733
    res = Response()
748
 
    result = res.unset_cookie('a', strict=False)
749
 
    assert result is None
 
734
    res.unset_cookie('a', strict=False) # no exception
 
735
 
750
736
 
751
737
def test_unset_cookie_not_existing_and_strict():
752
738
    res = Response()
758
744
    res.headers.add('Set-Cookie', 'b=3; Path=/')
759
745
    res.unset_cookie('a')
760
746
    eq_(res.headers.getall('Set-Cookie'), ['b=3; Path=/'])
 
747
    res.unset_cookie(text_('b'))
 
748
    eq_(res.headers.getall('Set-Cookie'), [])
761
749
 
762
750
def test_merge_cookies_no_set_cookie():
763
751
    res = Response()
789
777
 
790
778
def test_body_get_body_is_None_len_app_iter_is_zero():
791
779
    res = Response()
792
 
    res._app_iter = StringIO()
 
780
    res._app_iter = io.BytesIO()
793
781
    res._body = None
794
782
    result = res.body
795
 
    eq_(result, '')
 
783
    eq_(result, b'')
796
784
 
797
785
def test_cache_control_get():
798
786
    res = Response()
800
788
    eq_(res.cache_control.max_age, None)
801
789
 
802
790
def test_location():
803
 
    # covers webob/response.py:934-938
804
791
    res = Response()
805
792
    res.location = '/test.html'
806
793
    eq_(res.location, '/test.html')
848
835
    eq_(repr(res.cache_control),
849
836
        "<CacheControl 'max-age=0, must-revalidate, no-cache, no-store'>")
850
837
 
851
 
def test_status_int_set():
 
838
def test_status_code_set():
852
839
    res = Response()
853
 
    res.status_int = 400
 
840
    res.status_code = 400
854
841
    eq_(res._status, '400 Bad Request')
 
842
    res.status_int = 404
 
843
    eq_(res._status, '404 Not Found')
855
844
 
856
845
def test_cache_control_set_dict():
857
846
    res = Response()
865
854
 
866
855
def test_cache_control_set_unicode():
867
856
    res = Response()
868
 
    res.cache_control = u'abc'
 
857
    res.cache_control = text_(b'abc')
869
858
    eq_(repr(res.cache_control), "<CacheControl 'abc'>")
870
859
 
871
860
def test_cache_control_set_control_obj_is_not_None():
891
880
 
892
881
def test_body_file_write_no_charset():
893
882
    res = Response
894
 
    assert_raises(TypeError, res.write, u'foo')
 
883
    assert_raises(TypeError, res.write, text_('foo'))
895
884
 
896
885
def test_body_file_write_unicode_encodes():
897
 
    from webob.response import ResponseBodyFile
898
 
    s = unicode('La Pe\xc3\xb1a', 'utf-8')
 
886
    s = text_(b'La Pe\xc3\xb1a', 'utf-8')
899
887
    res = Response()
900
888
    res.write(s)
901
 
    eq_(res.app_iter, ['', 'La Pe\xc3\xb1a'])
 
889
    eq_(res.app_iter, [b'', b'La Pe\xc3\xb1a'])
902
890
 
903
891
def test_repr():
904
892
    res = Response()
947
935
 
948
936
def test_encode_content_gzip_notyet_gzipped():
949
937
    res = Response()
950
 
    res.app_iter = StringIO('foo')
 
938
    res.app_iter = io.BytesIO(b'foo')
951
939
    result = res.encode_content('gzip')
952
940
    eq_(result, None)
953
941
    eq_(res.content_length, 23)
954
 
    eq_(res.app_iter, ['\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff', '',
955
 
                       'K\xcb\xcf\x07\x00', '!es\x8c\x03\x00\x00\x00'])
 
942
    eq_(res.app_iter, [
 
943
        b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff',
 
944
        b'',
 
945
        b'K\xcb\xcf\x07\x00',
 
946
        b'!es\x8c\x03\x00\x00\x00'
 
947
        ])
956
948
 
957
949
def test_encode_content_gzip_notyet_gzipped_lazy():
958
950
    res = Response()
959
 
    res.app_iter = StringIO('foo')
 
951
    res.app_iter = io.BytesIO(b'foo')
960
952
    result = res.encode_content('gzip', lazy=True)
961
953
    eq_(result, None)
962
954
    eq_(res.content_length, None)
963
 
    eq_(list(res.app_iter), ['\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff', '',
964
 
                             'K\xcb\xcf\x07\x00', '!es\x8c\x03\x00\x00\x00'])
 
955
    eq_(list(res.app_iter), [
 
956
        b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff',
 
957
        b'',
 
958
        b'K\xcb\xcf\x07\x00',
 
959
        b'!es\x8c\x03\x00\x00\x00'
 
960
        ])
965
961
 
966
962
def test_decode_content_identity():
967
963
    res = Response()
976
972
 
977
973
def test_decode_content_gzip():
978
974
    from gzip import GzipFile
979
 
    io = StringIO()
980
 
    gzip_f = GzipFile(filename='', mode='w', fileobj=io)
981
 
    gzip_f.write('abc')
 
975
    io_ = io.BytesIO()
 
976
    gzip_f = GzipFile(filename='', mode='w', fileobj=io_)
 
977
    gzip_f.write(b'abc')
982
978
    gzip_f.close()
983
 
    body = io.getvalue()
 
979
    body = io_.getvalue()
984
980
    res = Response()
985
981
    res.content_encoding = 'gzip'
986
982
    res.body = body
987
983
    res.decode_content()
988
 
    eq_(res.body, 'abc')
 
984
    eq_(res.body, b'abc')
989
985
 
990
986
def test__abs_headerlist_location_with_scheme():
991
987
    res = Response()
994
990
    result = res._abs_headerlist({})
995
991
    eq_(result, [('Location', 'http:')])
996
992
 
997
 
def test_response_set_body_file():
998
 
    for data in ['abc', 'abcdef'*1024]:
999
 
        file = StringIO(data)
1000
 
        r = Response(body_file=file)
1001
 
        assert r.body == data
1002
 
 
 
993
def test__abs_headerlist_location_no_scheme():
 
994
    res = Response()
 
995
    res.content_encoding = 'gzip'
 
996
    res.headerlist = [('Location', '/abc')]
 
997
    result = res._abs_headerlist({'wsgi.url_scheme':'http',
 
998
                                  'HTTP_HOST':'example.com:80'})
 
999
    eq_(result, [('Location', 'http://example.com/abc')])
 
1000
 
 
1001
def test_response_set_body_file1():
 
1002
     data  = b'abc'
 
1003
     file = io.BytesIO(data)
 
1004
     r = Response(body_file=file)
 
1005
     assert r.body == data
 
1006
 
 
1007
def test_response_set_body_file2():
 
1008
    data = b'abcdef'*1024
 
1009
    file = io.BytesIO(data)
 
1010
    r = Response(body_file=file)
 
1011
    assert r.body == data
 
1012
 
 
1013
def test_response_json_body():
 
1014
    r = Response(json_body={'a': 1})
 
1015
    assert r.body == b'{"a":1}', repr(r.body)
 
1016
    assert r.content_type == 'application/json'
 
1017
    r = Response()
 
1018
    r.json_body = {"b": 1}
 
1019
    assert r.content_type == 'text/html'
 
1020
    del r.json_body
 
1021
    assert r.body == b''
 
1022
 
 
1023
def test_cache_expires_set_zero_then_nonzero():
 
1024
    res = Response()
 
1025
    res.cache_expires(seconds=0)
 
1026
    res.cache_expires(seconds=1)
 
1027
    eq_(res.pragma, None)
 
1028
    ok_(not res.cache_control.no_cache)
 
1029
    ok_(not res.cache_control.no_store)
 
1030
    ok_(not res.cache_control.must_revalidate)
 
1031
    eq_(res.cache_control.max_age, 1)