3
if sys.version >= '2.7':
4
from io import BytesIO as StringIO
6
from cStringIO import StringIO
8
from hashlib import md5
12
4
from nose.tools import eq_, ok_, assert_raises
14
from webob import BaseRequest, Request, Response
6
from webob.request import BaseRequest
7
from webob.request import Request
8
from webob.response import Response
9
from webob.compat import text_
10
from webob.compat import bytes_
16
12
def simple_app(environ, start_response):
17
13
start_response('200 OK', [
23
19
req = BaseRequest.blank('/')
24
20
res = req.get_response(simple_app)
25
21
assert res.status == '200 OK'
26
assert res.status_int == 200
22
assert res.status_code == 200
27
23
assert res.body == "OK"
28
24
assert res.charset == 'utf8'
29
25
assert res.content_type == 'text/html'
31
27
assert res.status == '404 Not Found'
32
assert res.status_int == 404
34
assert ''.join(res.app_iter) == 'Not OK'
28
assert res.status_code == 404
30
assert b''.join(res.app_iter) == b'Not OK'
35
31
res.charset = 'iso8859-1'
36
32
assert res.headers['content-type'] == 'text/html; charset=iso8859-1'
37
33
res.content_type = 'text/xml'
41
37
assert res.headerlist == [('content-type', 'text/html')]
42
38
res.set_cookie('x', 'y')
43
39
assert res.headers['set-cookie'].strip(';') == 'x=y; Path=/'
40
res.set_cookie(text_('x'), text_('y'))
41
assert res.headers['set-cookie'].strip(';') == 'x=y; Path=/'
44
42
res = Response('a body', '200 OK', content_type='text/html')
45
43
res.encode_content()
46
44
assert res.content_encoding == 'gzip'
47
eq_(res.body, '\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffKTH\xcaO\xa9\x04\x00\xf6\x86GI\x06\x00\x00\x00')
45
eq_(res.body, b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffKTH\xcaO\xa9\x04\x00\xf6\x86GI\x06\x00\x00\x00')
48
46
res.decode_content()
49
47
assert res.content_encoding is None
50
assert res.body == 'a body'
51
res.set_cookie('x', u'foo') # test unicode value
48
assert res.body == b'a body'
49
res.set_cookie('x', text_(b'foo')) # test unicode value
52
50
assert_raises(TypeError, Response, app_iter=iter(['a']),
55
eq_(Response(request=req)._environ, req)
56
eq_(Response(request=req)._request, None)
57
53
assert_raises(TypeError, Response, charset=None,
54
body=text_(b"unicode body"))
59
55
assert_raises(TypeError, Response, wrong_key='dummy')
57
def test_set_response_status_binary():
58
req = BaseRequest.blank('/')
59
res = req.get_response(simple_app)
60
res.status == b'200 OK'
61
assert res.status_code == 200
62
assert res.status == '200 OK'
64
def test_set_response_status_str_no_reason():
65
req = BaseRequest.blank('/')
66
res = req.get_response(simple_app)
68
assert res.status_code == 200
69
assert res.status == '200 OK'
71
def test_set_response_status_str_generic_reason():
72
req = BaseRequest.blank('/')
73
res = req.get_response(simple_app)
75
assert res.status_code == 299
76
assert res.status == '299 Success'
78
def test_set_response_status_code():
79
req = BaseRequest.blank('/')
80
res = req.get_response(simple_app)
82
assert res.status_code == 200
83
assert res.status == '200 OK'
85
def test_set_response_status_code_generic_reason():
86
req = BaseRequest.blank('/')
87
res = req.get_response(simple_app)
89
assert res.status_code == 299
90
assert res.status == '299 Success'
61
92
def test_content_type():
63
94
# default ctype and charset
71
102
r.content_type = None
72
103
eq_(r.content_type, None)
105
def test_init_content_type_w_charset():
106
v = 'text/plain;charset=ISO-8859-1'
107
eq_(Response(content_type=v).headers['content-type'], v)
74
110
def test_cookies():
76
res.set_cookie('x', u'\N{BLACK SQUARE}') # test unicode value
77
eq_(res.headers.getall('set-cookie'), ['x="\\342\\226\\240"; Path=/']) # uft8 encoded
113
res.set_cookie('x', text_(b'\N{BLACK SQUARE}', 'unicode_escape'))
115
eq_(res.headers.getall('set-cookie'), ['x="\\342\\226\\240"; Path=/'])
78
116
r2 = res.merge_cookies(simple_app)
79
117
r2 = BaseRequest.blank('/').get_response(r2)
80
118
eq_(r2.headerlist,
113
153
def test_HEAD_closes():
114
154
req = Request.blank('/')
115
155
req.method = 'HEAD'
116
app_iter = StringIO('foo')
156
app_iter = io.BytesIO(b'foo')
117
157
res = req.get_response(Response(app_iter=app_iter))
118
eq_(res.status_int, 200)
158
eq_(res.status_code, 200)
120
160
ok_(app_iter.closed)
122
162
def test_HEAD_conditional_response_returns_empty_response():
123
from webob.response import EmptyResponse
124
req = Request.blank('/')
126
res = Response(request=req, conditional_response=True)
129
if_none_match = 'none'
130
if_modified_since = False
132
def __init__(self, env):
163
req = Request.blank('/',
167
res = Response(conditional_response=True)
134
168
def start_response(status, headerlist):
136
res.RequestClass = FakeRequest
137
result = res({}, start_response)
138
ok_(isinstance(result, EmptyResponse))
170
result = res(req.environ, start_response)
171
assert not list(result)
140
173
def test_HEAD_conditional_response_range_empty_response():
141
from webob.response import EmptyResponse
142
req = Request.blank('/')
144
res = Response(request=req, conditional_response=True)
146
res.body = 'Are we not men?'
147
res.content_length = len(res.body)
150
if_none_match = 'none'
151
if_modified_since = False
152
def __init__(self, env):
154
self.range = self # simulate inner api
156
def content_range(self, length):
162
def match_response(self, res):
163
"""if_range_match attr"""
165
def start_response(status, headerlist):
167
res.RequestClass = FakeRequest
168
result = res({}, start_response)
169
ok_(isinstance(result, EmptyResponse), result)
174
req = Request.blank('/',
178
res = Response('Are we not men?', conditional_response=True)
179
assert req.get_response(res).body == b''
171
182
def test_conditional_response_if_none_match_false():
172
183
req = Request.blank('/', if_none_match='foo')
173
184
resp = Response(app_iter=['foo\n'],
185
conditional_response=True, etag='bar')
186
resp = req.get_response(resp)
187
eq_(resp.status_code, 200)
189
def test_conditional_response_if_none_match_true():
190
req = Request.blank('/', if_none_match='foo')
191
resp = Response(app_iter=['foo\n'],
174
192
conditional_response=True, etag='foo')
175
193
resp = req.get_response(resp)
176
eq_(resp.status_int, 304)
178
def test_conditional_response_if_none_match_true():
179
req = Request.blank('/', if_none_match='foo')
180
resp = Response(app_iter=['foo\n'],
181
conditional_response=True, etag='bar')
182
resp = req.get_response(resp)
183
eq_(resp.status_int, 200)
194
eq_(resp.status_code, 304)
196
def test_conditional_response_if_none_match_weak():
197
req = Request.blank('/', headers={'if-none-match': '"bar"'})
198
req_weak = Request.blank('/', headers={'if-none-match': 'W/"bar"'})
199
resp = Response(app_iter=['foo\n'], conditional_response=True, etag='bar')
200
resp_weak = Response(app_iter=['foo\n'], conditional_response=True, headers={'etag': 'W/"bar"'})
201
for rq in [req, req_weak]:
202
for rp in [resp, resp_weak]:
203
rq.get_response(rp).status_code == 304
205
r2 = Response(app_iter=['foo\n'], conditional_response=True, headers={'etag': '"foo"'})
206
r2_weak = Response(app_iter=['foo\n'], conditional_response=True, headers={'etag': 'W/"foo"'})
207
req_weak.get_response(r2).status_code == 200
208
req.get_response(r2_weak) == 200
185
211
def test_conditional_response_if_modified_since_false():
186
212
from datetime import datetime, timedelta
196
222
resp = Response(app_iter=['foo\n'], conditional_response=True,
197
223
last_modified=req.if_modified_since+timedelta(seconds=1))
198
224
resp = req.get_response(resp)
199
eq_(resp.status_int, 200)
225
eq_(resp.status_code, 200)
201
227
def test_conditional_response_range_not_satisfiable_response():
202
228
req = Request.blank('/', range='bytes=100-200')
203
229
resp = Response(app_iter=['foo\n'], content_length=4,
204
230
conditional_response=True)
205
231
resp = req.get_response(resp)
206
eq_(resp.status_int, 416)
232
eq_(resp.status_code, 416)
207
233
eq_(resp.content_range.start, None)
208
234
eq_(resp.content_range.stop, None)
209
235
eq_(resp.content_range.length, 4)
210
eq_(resp.body, 'Requested range not satisfiable: bytes=100-200')
236
eq_(resp.body, b'Requested range not satisfiable: bytes=100-200')
212
238
def test_HEAD_conditional_response_range_not_satisfiable_response():
213
239
req = Request.blank('/', method='HEAD', range='bytes=100-200')
214
240
resp = Response(app_iter=['foo\n'], content_length=4,
215
241
conditional_response=True)
216
242
resp = req.get_response(resp)
217
eq_(resp.status_int, 416)
243
eq_(resp.status_code, 416)
218
244
eq_(resp.content_range.start, None)
219
245
eq_(resp.content_range.stop, None)
220
246
eq_(resp.content_range.length, 4)
223
def test_del_environ():
225
res.environ = {'yo': 'mama'}
226
eq_(res.environ, {'yo': 'mama'})
228
eq_(res.environ, None)
229
eq_(res.request, None)
231
def test_set_request_environ():
234
environ = {'jo': 'mama'}
235
res.request = FakeRequest
236
eq_(res.environ, {'jo': 'mama'})
237
eq_(res.request, FakeRequest)
239
eq_(res.environ, None)
240
eq_(res.request, None)
242
def test_del_request():
246
res.request = FakeRequest
248
eq_(res.environ, None)
249
eq_(res.request, None)
251
def test_set_environ_via_request_subterfuge():
253
def __init__(self, env):
256
res.RequestClass = FakeRequest
257
res.request = {'action': 'dwim'}
258
eq_(res.environ, {'action': 'dwim'})
259
ok_(isinstance(res.request, FakeRequest))
260
eq_(res.request.environ, res.environ)
262
def test_set_request():
265
environ = {'foo': 'bar'}
266
res.request = FakeRequest
267
eq_(res.request, FakeRequest)
268
eq_(res.environ, FakeRequest.environ)
270
eq_(res.environ, None)
271
eq_(res.request, None)
273
249
def test_md5_etag():
277
253
War was beginning.
278
254
Captain: What happen ?
296
272
ok_('\n' not in res.etag)
298
md5(res.body).digest().encode('base64').replace('\n', '').strip('='))
273
eq_(res.etag, 'pN8sSTUrEaPRzmurGptqmw')
299
274
eq_(res.content_md5, None)
301
276
def test_md5_etag_set_content_md5():
303
b = 'The quick brown fox jumps over the lazy dog'
304
res.md5_etag(b, set_content_md5=True)
306
md5(b).digest().encode('base64').replace('\n', '').strip('='))
278
body = b'The quick brown fox jumps over the lazy dog'
279
res.md5_etag(body, set_content_md5=True)
280
eq_(res.content_md5, 'nhB9nTcrtoJr2B01QqQZ1g==')
308
282
def test_decode_content_defaults_to_identity():
310
res.body = 'There be dragons'
284
res.body = b'There be dragons'
311
285
res.decode_content()
312
eq_(res.body, 'There be dragons')
286
eq_(res.body, b'There be dragons')
314
288
def test_decode_content_with_deflate():
290
body = b'Hey Hey Hey'
317
291
# Simulate inflate by chopping the headers off
318
292
# the gzip encoded data
319
res.body = zlib.compress(b)[2:-4]
293
res.body = zlib.compress(body)[2:-4]
320
294
res.content_encoding = 'deflate'
321
295
res.decode_content()
323
297
eq_(res.content_encoding, None)
325
299
def test_content_length():
328
302
req_head = Request.blank('/', method='HEAD')
329
303
r1 = req_head.get_response(r0)
330
eq_(r1.status_int, 200)
304
eq_(r1.status_code, 200)
332
306
eq_(r1.content_length, 10)
334
308
req_get = Request.blank('/')
335
309
r2 = req_get.get_response(r0)
336
eq_(r2.status_int, 200)
310
eq_(r2.status_code, 200)
311
eq_(r2.body, b'x'*10)
338
312
eq_(r2.content_length, 10)
340
r3 = Response(app_iter=['x']*10)
314
r3 = Response(app_iter=[b'x']*10)
341
315
eq_(r3.content_length, None)
316
eq_(r3.body, b'x'*10)
343
317
eq_(r3.content_length, 10)
345
r4 = Response(app_iter=['x']*10, content_length=20) # wrong content_length
319
r4 = Response(app_iter=[b'x']*10,
320
content_length=20) # wrong content_length
346
321
eq_(r4.content_length, 20)
347
322
assert_raises(AssertionError, lambda: r4.body)
349
324
req_range = Request.blank('/', range=(0,5))
350
325
r0.conditional_response = True
351
326
r5 = req_range.get_response(r0)
352
eq_(r5.status_int, 206)
353
eq_(r5.body, 'xxxxx')
327
eq_(r5.status_code, 206)
328
eq_(r5.body, b'xxxxx')
354
329
eq_(r5.content_length, 5)
356
331
def test_app_iter_range():
357
332
req = Request.blank('/', range=(2,5))
358
333
for app_iter in [
365
['012', '3', '4', '5'],
367
['0', '12', '34', '5'],
336
[b'0', b'1234', b'5'],
338
[b'01', b'234', b'5'],
339
[b'012', b'34', b'5'],
340
[b'012', b'3', b'4', b'5'],
341
[b'012', b'3', b'45'],
342
[b'0', b'12', b'34', b'5'],
343
[b'0', b'12', b'345'],
371
346
app_iter=app_iter,
400
375
def test_from_file():
401
376
res = Response('test')
403
res = Response(app_iter=iter(['test ', 'body']),
404
content_type='text/plain')
408
input_ = StringIO(str(res))
409
res2 = Response.from_file(input_)
377
inp = io.BytesIO(bytes_(str(res)))
380
def test_from_file2():
381
res = Response(app_iter=iter([b'test ', b'body']),
382
content_type='text/plain')
383
inp = io.BytesIO(bytes_(str(res)))
386
def test_from_text_file():
387
res = Response('test')
388
inp = io.StringIO(text_(str(res), 'utf-8'))
390
res = Response(app_iter=iter([b'test ', b'body']),
391
content_type='text/plain')
392
inp = io.StringIO(text_(str(res), 'utf-8'))
395
def equal_resp(res, inp):
396
res2 = Response.from_file(inp)
410
397
eq_(res.body, res2.body)
411
398
eq_(res.headers, res2.headers)
413
400
def test_from_file_w_leading_space_in_header():
414
401
# Make sure the removal of code dealing with leading spaces is safe
415
402
res1 = Response()
416
file_w_space = StringIO('200 OK\n\tContent-Type: text/html; charset=UTF-8')
403
file_w_space = io.BytesIO(
404
b'200 OK\n\tContent-Type: text/html; charset=UTF-8')
417
405
res2 = Response.from_file(file_w_space)
418
406
eq_(res1.headers, res2.headers)
420
408
def test_file_bad_header():
421
file_w_bh = StringIO('200 OK\nBad Header')
409
file_w_bh = io.BytesIO(b'200 OK\nBad Header')
422
410
assert_raises(ValueError, Response.from_file, file_w_bh)
424
412
def test_set_status():
426
res.status = u"OK 200"
427
eq_(res.status, "OK 200")
415
eq_(res.status, "200 OK")
428
416
assert_raises(TypeError, setattr, res, 'status', float(200))
430
418
def test_set_headerlist():
463
450
eq_(list(range), [])
465
452
def test_resp_write_app_iter_non_list():
466
res = Response(app_iter=('a','b'))
453
res = Response(app_iter=(b'a', b'b'))
467
454
eq_(res.content_length, None)
456
eq_(res.body, b'abc')
470
457
eq_(res.content_length, 3)
472
459
def test_response_file_body_writelines():
473
460
from webob.response import ResponseBodyFile
474
res = Response(app_iter=['foo'])
461
res = Response(app_iter=[b'foo'])
475
462
rbo = ResponseBodyFile(res)
476
463
rbo.writelines(['bar', 'baz'])
477
eq_(res.app_iter, ['foo', 'bar', 'baz'])
464
eq_(res.app_iter, [b'foo', b'bar', b'baz'])
478
465
rbo.flush() # noop
479
eq_(res.app_iter, ['foo', 'bar', 'baz'])
466
eq_(res.app_iter, [b'foo', b'bar', b'baz'])
481
468
def test_response_write_non_str():
483
470
assert_raises(TypeError, res.write, object())
485
472
def test_response_file_body_write_empty_app_iter():
486
from webob.response import ResponseBodyFile
487
473
res = Response('foo')
489
eq_(res.app_iter, ['foo', 'baz'])
475
eq_(res.app_iter, [b'foo', b'baz'])
491
477
def test_response_file_body_write_empty_body():
492
478
res = Response('')
494
eq_(res.app_iter, ['', 'baz'])
480
eq_(res.app_iter, [b'', b'baz'])
496
482
def test_response_file_body_close_not_implemented():
497
483
rbo = Response().body_file
510
496
assert_raises(AttributeError, res.__getattribute__, 'body')
512
498
def test_body_get_is_unicode_notverylong():
513
res = Response(app_iter=(u'foo',))
499
res = Response(app_iter=(text_(b'foo'),))
514
500
assert_raises(TypeError, res.__getattribute__, 'body')
516
502
def test_body_get_is_unicode():
517
res = Response(app_iter=(['x'] * 51 + [u'x']))
503
res = Response(app_iter=(['x'] * 51 + [text_(b'x')]))
518
504
assert_raises(TypeError, res.__getattribute__, 'body')
520
506
def test_body_set_not_unicode_or_str():
543
529
def test_unicode_body():
545
531
res.charset = 'utf-8'
546
bbody = 'La Pe\xc3\xb1a' # binary string
547
ubody = unicode(bbody, 'utf-8') # unicode string
532
bbody = b'La Pe\xc3\xb1a' # binary string
533
ubody = text_(bbody, 'utf-8') # unicode string
549
535
eq_(res.unicode_body, ubody)
550
536
res.ubody = ubody
551
537
eq_(res.body, bbody)
555
541
def test_text_get_decode():
557
543
res.charset = 'utf-8'
558
res.body = 'La Pe\xc3\xb1a'
559
eq_(res.text, unicode('La Pe\xc3\xb1a', 'utf-8'))
544
res.body = b'La Pe\xc3\xb1a'
545
eq_(res.text, text_(b'La Pe\xc3\xb1a', 'utf-8'))
561
547
def test_text_set_no_charset():
568
554
res.charset = 'utf-8'
569
555
assert_raises(TypeError, res.__setattr__, 'text',
572
558
def test_text_del():
573
559
res = Response('123')
576
562
eq_(res.content_length, 0)
578
564
def test_body_file_del():
581
567
eq_(res.content_length, 3)
582
eq_(res.app_iter, ['123'])
568
eq_(res.app_iter, [b'123'])
583
569
del res.body_file
585
571
eq_(res.content_length, 0)
587
573
def test_write_unicode():
589
res.text = unicode('La Pe\xc3\xb1a', 'utf-8')
591
eq_(res.text, unicode('La Pe\xc3\xb1aa', 'utf-8'))
575
res.text = text_(b'La Pe\xc3\xb1a', 'utf-8')
576
res.write(text_(b'a'))
577
eq_(res.text, text_(b'La Pe\xc3\xb1aa', 'utf-8'))
593
579
def test_write_unicode_no_charset():
594
580
res = Response(charset=None)
595
assert_raises(TypeError, res.write, u'a')
581
assert_raises(TypeError, res.write, text_(b'a'))
597
583
def test_write_text():
586
res.write(text_(b'a'))
601
587
eq_(res.text, 'abca')
603
589
def test_app_iter_del():
892
881
def test_body_file_write_no_charset():
894
assert_raises(TypeError, res.write, u'foo')
883
assert_raises(TypeError, res.write, text_('foo'))
896
885
def test_body_file_write_unicode_encodes():
897
from webob.response import ResponseBodyFile
898
s = unicode('La Pe\xc3\xb1a', 'utf-8')
886
s = text_(b'La Pe\xc3\xb1a', 'utf-8')
901
eq_(res.app_iter, ['', 'La Pe\xc3\xb1a'])
889
eq_(res.app_iter, [b'', b'La Pe\xc3\xb1a'])
948
936
def test_encode_content_gzip_notyet_gzipped():
950
res.app_iter = StringIO('foo')
938
res.app_iter = io.BytesIO(b'foo')
951
939
result = res.encode_content('gzip')
952
940
eq_(result, None)
953
941
eq_(res.content_length, 23)
954
eq_(res.app_iter, ['\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff', '',
955
'K\xcb\xcf\x07\x00', '!es\x8c\x03\x00\x00\x00'])
943
b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff',
945
b'K\xcb\xcf\x07\x00',
946
b'!es\x8c\x03\x00\x00\x00'
957
949
def test_encode_content_gzip_notyet_gzipped_lazy():
959
res.app_iter = StringIO('foo')
951
res.app_iter = io.BytesIO(b'foo')
960
952
result = res.encode_content('gzip', lazy=True)
961
953
eq_(result, None)
962
954
eq_(res.content_length, None)
963
eq_(list(res.app_iter), ['\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff', '',
964
'K\xcb\xcf\x07\x00', '!es\x8c\x03\x00\x00\x00'])
955
eq_(list(res.app_iter), [
956
b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff',
958
b'K\xcb\xcf\x07\x00',
959
b'!es\x8c\x03\x00\x00\x00'
966
962
def test_decode_content_identity():
994
990
result = res._abs_headerlist({})
995
991
eq_(result, [('Location', 'http:')])
997
def test_response_set_body_file():
998
for data in ['abc', 'abcdef'*1024]:
999
file = StringIO(data)
1000
r = Response(body_file=file)
1001
assert r.body == data
993
def test__abs_headerlist_location_no_scheme():
995
res.content_encoding = 'gzip'
996
res.headerlist = [('Location', '/abc')]
997
result = res._abs_headerlist({'wsgi.url_scheme':'http',
998
'HTTP_HOST':'example.com:80'})
999
eq_(result, [('Location', 'http://example.com/abc')])
1001
def test_response_set_body_file1():
1003
file = io.BytesIO(data)
1004
r = Response(body_file=file)
1005
assert r.body == data
1007
def test_response_set_body_file2():
1008
data = b'abcdef'*1024
1009
file = io.BytesIO(data)
1010
r = Response(body_file=file)
1011
assert r.body == data
1013
def test_response_json_body():
1014
r = Response(json_body={'a': 1})
1015
assert r.body == b'{"a":1}', repr(r.body)
1016
assert r.content_type == 'application/json'
1018
r.json_body = {"b": 1}
1019
assert r.content_type == 'text/html'
1021
assert r.body == b''
1023
def test_cache_expires_set_zero_then_nonzero():
1025
res.cache_expires(seconds=0)
1026
res.cache_expires(seconds=1)
1027
eq_(res.pragma, None)
1028
ok_(not res.cache_control.no_cache)
1029
ok_(not res.cache_control.no_store)
1030
ok_(not res.cache_control.must_revalidate)
1031
eq_(res.cache_control.max_age, 1)