10
from test import support
11
threading = support.import_module('threading')
17
here = os.path.dirname(__file__)
18
# Self-signed cert file for 'localhost'
19
CERT_localhost = os.path.join(here, 'keycert.pem')
20
# Self-signed cert file for 'fakehostname'
21
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
24
# Loopback http server infrastructure
26
class LoopbackHttpServer(http.server.HTTPServer):
27
"""HTTP server w/ a few modifications that make it useful for
28
loopback testing purposes.
31
def __init__(self, server_address, RequestHandlerClass):
32
http.server.HTTPServer.__init__(self,
36
# Set the timeout of our listening socket really low so
37
# that we can stop the server easily.
38
self.socket.settimeout(0.1)
40
def get_request(self):
41
"""HTTPServer method, overridden."""
43
request, client_address = self.socket.accept()
45
# It's a loopback connection, so setting the timeout
46
# really low shouldn't affect anything, but should make
47
# deadlocks less likely to occur.
48
request.settimeout(10.0)
50
return (request, client_address)
52
class LoopbackHttpServerThread(threading.Thread):
53
"""Stoppable thread that runs a loopback http server."""
55
def __init__(self, request_handler):
56
threading.Thread.__init__(self)
57
self._stop_server = False
58
self.ready = threading.Event()
59
request_handler.protocol_version = "HTTP/1.0"
60
self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
62
#print "Serving HTTP on %s port %s" % (self.httpd.server_name,
63
# self.httpd.server_port)
64
self.port = self.httpd.server_port
67
"""Stops the webserver if it's currently running."""
70
self._stop_server = True
73
self.httpd.server_close()
77
while not self._stop_server:
78
self.httpd.handle_request()
80
# Authentication infrastructure
82
class DigestAuthHandler:
83
"""Handler for performing digest authentication."""
89
self._realm_name = "Test Realm"
92
def set_qop(self, qop):
95
def set_users(self, users):
96
assert isinstance(users, dict)
99
def set_realm(self, realm):
100
self._realm_name = realm
102
def _generate_nonce(self):
103
self._request_num += 1
104
nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
105
self._nonces.append(nonce)
108
def _create_auth_dict(self, auth_str):
109
first_space_index = auth_str.find(" ")
110
auth_str = auth_str[first_space_index+1:]
112
parts = auth_str.split(",")
116
name, value = part.split("=")
118
if value[0] == '"' and value[-1] == '"':
121
value = value.strip()
122
auth_dict[name] = value
125
def _validate_auth(self, auth_dict, password, method, uri):
127
final_dict.update(auth_dict)
128
final_dict["password"] = password
129
final_dict["method"] = method
130
final_dict["uri"] = uri
131
HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
132
HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
133
HA2_str = "%(method)s:%(uri)s" % final_dict
134
HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
135
final_dict["HA1"] = HA1
136
final_dict["HA2"] = HA2
137
response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
138
"%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
139
response = hashlib.md5(response_str.encode("ascii")).hexdigest()
141
return response == auth_dict["response"]
143
def _return_auth_challenge(self, request_handler):
144
request_handler.send_response(407, "Proxy Authentication Required")
145
request_handler.send_header("Content-Type", "text/html")
146
request_handler.send_header(
147
'Proxy-Authenticate', 'Digest realm="%s", '
150
(self._realm_name, self._qop, self._generate_nonce()))
151
# XXX: Not sure if we're supposed to add this next header or
153
#request_handler.send_header('Connection', 'close')
154
request_handler.end_headers()
155
request_handler.wfile.write(b"Proxy Authentication Required.")
158
def handle_request(self, request_handler):
159
"""Performs digest authentication on the given HTTP request
160
handler. Returns True if authentication was successful, False
163
If no users have been set, then digest auth is effectively
164
disabled and this method will always return True.
167
if len(self._users) == 0:
170
if "Proxy-Authorization" not in request_handler.headers:
171
return self._return_auth_challenge(request_handler)
173
auth_dict = self._create_auth_dict(
174
request_handler.headers["Proxy-Authorization"]
176
if auth_dict["username"] in self._users:
177
password = self._users[ auth_dict["username"] ]
179
return self._return_auth_challenge(request_handler)
180
if not auth_dict.get("nonce") in self._nonces:
181
return self._return_auth_challenge(request_handler)
183
self._nonces.remove(auth_dict["nonce"])
185
auth_validated = False
187
# MSIE uses short_path in its validation, but Python's
188
# urllib.request uses the full path, so we're going to see if
189
# either of them works here.
191
for path in [request_handler.path, request_handler.short_path]:
192
if self._validate_auth(auth_dict,
194
request_handler.command,
196
auth_validated = True
198
if not auth_validated:
199
return self._return_auth_challenge(request_handler)
202
# Proxy test infrastructure
204
class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
205
"""This is a 'fake proxy' that makes it look like the entire
206
internet has gone down due to a sudden zombie invasion. It main
207
utility is in providing us with authentication support for
211
def __init__(self, digest_auth_handler, *args, **kwargs):
212
# This has to be set before calling our parent's __init__(), which will
213
# try to call do_GET().
214
self.digest_auth_handler = digest_auth_handler
215
http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
217
def log_message(self, format, *args):
218
# Uncomment the next line for debugging.
219
# sys.stderr.write(format % args)
223
(scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
225
self.short_path = path
226
if self.digest_auth_handler.handle_request(self):
227
self.send_response(200, "OK")
228
self.send_header("Content-Type", "text/html")
230
self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
232
self.wfile.write(b"Our apologies, but our server is down due to "
233
b"a sudden zombie invasion.")
237
class ProxyAuthTests(unittest.TestCase):
238
URL = "http://localhost"
245
super(ProxyAuthTests, self).setUp()
246
self.digest_auth_handler = DigestAuthHandler()
247
self.digest_auth_handler.set_users({self.USER: self.PASSWD})
248
self.digest_auth_handler.set_realm(self.REALM)
249
def create_fake_proxy_handler(*args, **kwargs):
250
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
252
self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
254
self.server.ready.wait()
255
proxy_url = "http://127.0.0.1:%d" % self.server.port
256
handler = urllib.request.ProxyHandler({"http" : proxy_url})
257
self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
258
self.opener = urllib.request.build_opener(
259
handler, self.proxy_digest_handler)
263
super(ProxyAuthTests, self).tearDown()
265
def test_proxy_with_bad_password_raises_httperror(self):
266
self.proxy_digest_handler.add_password(self.REALM, self.URL,
267
self.USER, self.PASSWD+"bad")
268
self.digest_auth_handler.set_qop("auth")
269
self.assertRaises(urllib.error.HTTPError,
273
def test_proxy_with_no_password_raises_httperror(self):
274
self.digest_auth_handler.set_qop("auth")
275
self.assertRaises(urllib.error.HTTPError,
279
def test_proxy_qop_auth_works(self):
280
self.proxy_digest_handler.add_password(self.REALM, self.URL,
281
self.USER, self.PASSWD)
282
self.digest_auth_handler.set_qop("auth")
283
result = self.opener.open(self.URL)
288
def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
289
self.proxy_digest_handler.add_password(self.REALM, self.URL,
290
self.USER, self.PASSWD)
291
self.digest_auth_handler.set_qop("auth-int")
293
result = self.opener.open(self.URL)
294
except urllib.error.URLError:
295
# It's okay if we don't support auth-int, but we certainly
296
# shouldn't receive any kind of exception here other than
305
def GetRequestHandler(responses):
307
class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
309
server_version = "TestHTTP/"
311
headers_received = []
315
body = self.send_head()
317
done = self.wfile.write(body)
321
content_length = self.headers["Content-Length"]
322
post_data = self.rfile.read(int(content_length))
324
self.requests.append(post_data)
327
FakeHTTPRequestHandler.headers_received = self.headers
328
self.requests.append(self.path)
329
response_code, headers, body = responses.pop(0)
331
self.send_response(response_code)
333
for (header, value) in headers:
334
self.send_header(header, value % {'port':self.port})
336
self.send_header("Content-type", "text/plain")
341
def log_message(self, *args):
345
return FakeHTTPRequestHandler
348
class TestUrlopen(unittest.TestCase):
349
"""Tests urllib.request.urlopen using the network.
351
These tests are not exhaustive. Assuming that testing using files does a
352
good job overall of some of the basic interface features. There are no
353
tests exercising the optional 'data' and 'proxies' arguments. No tests
354
for transparent redirection have been written.
358
super(TestUrlopen, self).setUp()
359
# Ignore proxies for localhost tests.
360
self.old_environ = os.environ.copy()
361
os.environ['NO_PROXY'] = '*'
365
if self.server is not None:
368
os.environ.update(self.old_environ)
369
super(TestUrlopen, self).tearDown()
371
def urlopen(self, url, data=None, **kwargs):
373
f = urllib.request.urlopen(url, data, **kwargs)
375
# Exercise various methods
376
l.extend(f.readlines(200))
377
l.append(f.readline())
378
l.append(f.read(1024))
384
def start_server(self, responses=None):
385
if responses is None:
386
responses = [(200, [], b"we don't care")]
387
handler = GetRequestHandler(responses)
389
self.server = LoopbackHttpServerThread(handler)
391
self.server.ready.wait()
392
port = self.server.port
396
def start_https_server(self, responses=None, **kwargs):
397
if not hasattr(urllib.request, 'HTTPSHandler'):
398
self.skipTest('ssl support required')
399
from test.ssl_servers import make_https_server
400
if responses is None:
401
responses = [(200, [], b"we care a bit")]
402
handler = GetRequestHandler(responses)
403
server = make_https_server(self, handler_class=handler, **kwargs)
404
handler.port = server.port
407
def test_redirection(self):
408
expected_response = b"We got here..."
410
(302, [("Location", "http://localhost:%(port)s/somewhere_else")],
412
(200, [], expected_response)
415
handler = self.start_server(responses)
416
data = self.urlopen("http://localhost:%s/" % handler.port)
417
self.assertEqual(data, expected_response)
418
self.assertEqual(handler.requests, ["/", "/somewhere_else"])
420
def test_chunked(self):
421
expected_response = b"hello world"
429
response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
430
handler = self.start_server(response)
431
data = self.urlopen("http://localhost:%s/" % handler.port)
432
self.assertEqual(data, expected_response)
435
expected_response = b"Bad bad bad..."
436
handler = self.start_server([(404, [], expected_response)])
439
self.urlopen("http://localhost:%s/weeble" % handler.port)
440
except urllib.error.URLError as f:
444
self.fail("404 should raise URLError")
446
self.assertEqual(data, expected_response)
447
self.assertEqual(handler.requests, ["/weeble"])
450
expected_response = b"pycon 2008..."
451
handler = self.start_server([(200, [], expected_response)])
452
data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
453
self.assertEqual(data, expected_response)
454
self.assertEqual(handler.requests, ["/bizarre"])
456
def test_200_with_parameters(self):
457
expected_response = b"pycon 2008..."
458
handler = self.start_server([(200, [], expected_response)])
459
data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
461
self.assertEqual(data, expected_response)
462
self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
464
def test_https(self):
465
handler = self.start_https_server()
466
data = self.urlopen("https://localhost:%s/bizarre" % handler.port)
467
self.assertEqual(data, b"we care a bit")
469
def test_https_with_cafile(self):
470
handler = self.start_https_server(certfile=CERT_localhost)
473
data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
474
cafile=CERT_localhost)
475
self.assertEqual(data, b"we care a bit")
477
with self.assertRaises(urllib.error.URLError) as cm:
478
self.urlopen("https://localhost:%s/bizarre" % handler.port,
479
cafile=CERT_fakehostname)
480
# Good cert, but mismatching hostname
481
handler = self.start_https_server(certfile=CERT_fakehostname)
482
with self.assertRaises(ssl.CertificateError) as cm:
483
self.urlopen("https://localhost:%s/bizarre" % handler.port,
484
cafile=CERT_fakehostname)
486
def test_https_with_cadefault(self):
487
handler = self.start_https_server(certfile=CERT_localhost)
488
# Self-signed cert should fail verification with system certificate store
489
with self.assertRaises(urllib.error.URLError) as cm:
490
self.urlopen("https://localhost:%s/bizarre" % handler.port,
493
def test_https_sni(self):
495
self.skipTest("ssl module required")
497
self.skipTest("SNI support required in OpenSSL")
499
def cb_sni(ssl_sock, server_name, initial_context):
501
sni_name = server_name
502
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
503
context.set_servername_callback(cb_sni)
504
handler = self.start_https_server(context=context, certfile=CERT_localhost)
505
self.urlopen("https://localhost:%s" % handler.port)
506
self.assertEqual(sni_name, "localhost")
508
def test_sending_headers(self):
509
handler = self.start_server()
510
req = urllib.request.Request("http://localhost:%s/" % handler.port,
511
headers={"Range": "bytes=20-39"})
512
urllib.request.urlopen(req)
513
self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
515
def test_basic(self):
516
handler = self.start_server()
517
open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
518
for attr in ("read", "close", "info", "geturl"):
519
self.assertTrue(hasattr(open_url, attr), "object returned from "
520
"urlopen lacks the %s attribute" % attr)
522
self.assertTrue(open_url.read(), "calling 'read' failed")
527
handler = self.start_server()
529
open_url = urllib.request.urlopen(
530
"http://localhost:%s" % handler.port)
531
info_obj = open_url.info()
532
self.assertIsInstance(info_obj, email.message.Message,
533
"object returned by 'info' is not an "
534
"instance of email.message.Message")
535
self.assertEqual(info_obj.get_content_subtype(), "plain")
539
def test_geturl(self):
540
# Make sure same URL as opened is returned by geturl.
541
handler = self.start_server()
542
open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
543
url = open_url.geturl()
544
self.assertEqual(url, "http://localhost:%s" % handler.port)
546
def test_bad_address(self):
547
# Make sure proper exception is raised when connecting to a bogus
550
# as indicated by the comment below, this might fail with some ISP,
551
# so we run the test only when -unetwork/-uall is specified to
552
# mitigate the problem a bit (see #17564)
553
support.requires('network')
554
self.assertRaises(OSError,
555
# Given that both VeriSign and various ISPs have in
556
# the past or are presently hijacking various invalid
557
# domain name requests in an attempt to boost traffic
558
# to their own sites, finding a domain name to use
559
# for this test is difficult. RFC2606 leads one to
560
# believe that '.invalid' should work, but experience
561
# seemed to indicate otherwise. Single character
562
# TLDs are likely to remain invalid, so this seems to
563
# be the best choice. The trailing '.' prevents a
564
# related problem: The normal DNS resolver appends
565
# the domain names from the search path if there is
566
# no '.' the end and, and if one of those domains
567
# implements a '*' rule a result is returned.
568
# However, none of this will prevent the test from
569
# failing if the ISP hijacks all invalid domain
570
# requests. The real solution would be to be able to
571
# parameterize the framework with a mock resolver.
572
urllib.request.urlopen,
573
"http://sadflkjsasf.i.nvali.d./")
575
def test_iteration(self):
576
expected_response = b"pycon 2008..."
577
handler = self.start_server([(200, [], expected_response)])
578
data = urllib.request.urlopen("http://localhost:%s" % handler.port)
580
self.assertEqual(line, expected_response)
582
def test_line_iteration(self):
583
lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
584
expected_response = b"".join(lines)
585
handler = self.start_server([(200, [], expected_response)])
586
data = urllib.request.urlopen("http://localhost:%s" % handler.port)
587
for index, line in enumerate(data):
588
self.assertEqual(line, lines[index],
589
"Fetched line number %s doesn't match expected:\n"
590
" Expected length was %s, got %s" %
591
(index, len(lines[index]), len(line)))
592
self.assertEqual(index + 1, len(lines))
595
@support.reap_threads
597
support.run_unittest(ProxyAuthTests, TestUrlopen)
599
if __name__ == "__main__":