~ubuntu-branches/ubuntu/trusty/python3.4/trusty-proposed

« back to all changes in this revision

Viewing changes to Lib/test/test_urllib2_localnet.py

  • Committer: Package Import Robot
  • Author(s): Matthias Klose
  • Date: 2013-11-25 09:44:27 UTC
  • Revision ID: package-import@ubuntu.com-20131125094427-lzxj8ap5w01lmo7f
Tags: upstream-3.4~b1
ImportĀ upstreamĀ versionĀ 3.4~b1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python3
 
2
 
 
3
import os
 
4
import email
 
5
import urllib.parse
 
6
import urllib.request
 
7
import http.server
 
8
import unittest
 
9
import hashlib
 
10
from test import support
 
11
threading = support.import_module('threading')
 
12
try:
 
13
    import ssl
 
14
except ImportError:
 
15
    ssl = None
 
16
 
 
17
here = os.path.dirname(__file__)
 
18
# Self-signed cert file for 'localhost'
 
19
CERT_localhost = os.path.join(here, 'keycert.pem')
 
20
# Self-signed cert file for 'fakehostname'
 
21
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
 
22
 
 
23
 
 
24
# Loopback http server infrastructure
 
25
 
 
26
class LoopbackHttpServer(http.server.HTTPServer):
 
27
    """HTTP server w/ a few modifications that make it useful for
 
28
    loopback testing purposes.
 
29
    """
 
30
 
 
31
    def __init__(self, server_address, RequestHandlerClass):
 
32
        http.server.HTTPServer.__init__(self,
 
33
                                        server_address,
 
34
                                        RequestHandlerClass)
 
35
 
 
36
        # Set the timeout of our listening socket really low so
 
37
        # that we can stop the server easily.
 
38
        self.socket.settimeout(0.1)
 
39
 
 
40
    def get_request(self):
 
41
        """HTTPServer method, overridden."""
 
42
 
 
43
        request, client_address = self.socket.accept()
 
44
 
 
45
        # It's a loopback connection, so setting the timeout
 
46
        # really low shouldn't affect anything, but should make
 
47
        # deadlocks less likely to occur.
 
48
        request.settimeout(10.0)
 
49
 
 
50
        return (request, client_address)
 
51
 
 
52
class LoopbackHttpServerThread(threading.Thread):
 
53
    """Stoppable thread that runs a loopback http server."""
 
54
 
 
55
    def __init__(self, request_handler):
 
56
        threading.Thread.__init__(self)
 
57
        self._stop_server = False
 
58
        self.ready = threading.Event()
 
59
        request_handler.protocol_version = "HTTP/1.0"
 
60
        self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
 
61
                                        request_handler)
 
62
        #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
 
63
        #                                      self.httpd.server_port)
 
64
        self.port = self.httpd.server_port
 
65
 
 
66
    def stop(self):
 
67
        """Stops the webserver if it's currently running."""
 
68
 
 
69
        # Set the stop flag.
 
70
        self._stop_server = True
 
71
 
 
72
        self.join()
 
73
        self.httpd.server_close()
 
74
 
 
75
    def run(self):
 
76
        self.ready.set()
 
77
        while not self._stop_server:
 
78
            self.httpd.handle_request()
 
79
 
 
80
# Authentication infrastructure
 
81
 
 
82
class DigestAuthHandler:
 
83
    """Handler for performing digest authentication."""
 
84
 
 
85
    def __init__(self):
 
86
        self._request_num = 0
 
87
        self._nonces = []
 
88
        self._users = {}
 
89
        self._realm_name = "Test Realm"
 
90
        self._qop = "auth"
 
91
 
 
92
    def set_qop(self, qop):
 
93
        self._qop = qop
 
94
 
 
95
    def set_users(self, users):
 
96
        assert isinstance(users, dict)
 
97
        self._users = users
 
98
 
 
99
    def set_realm(self, realm):
 
100
        self._realm_name = realm
 
101
 
 
102
    def _generate_nonce(self):
 
103
        self._request_num += 1
 
104
        nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
 
105
        self._nonces.append(nonce)
 
106
        return nonce
 
107
 
 
108
    def _create_auth_dict(self, auth_str):
 
109
        first_space_index = auth_str.find(" ")
 
110
        auth_str = auth_str[first_space_index+1:]
 
111
 
 
112
        parts = auth_str.split(",")
 
113
 
 
114
        auth_dict = {}
 
115
        for part in parts:
 
116
            name, value = part.split("=")
 
117
            name = name.strip()
 
118
            if value[0] == '"' and value[-1] == '"':
 
119
                value = value[1:-1]
 
120
            else:
 
121
                value = value.strip()
 
122
            auth_dict[name] = value
 
123
        return auth_dict
 
124
 
 
125
    def _validate_auth(self, auth_dict, password, method, uri):
 
126
        final_dict = {}
 
127
        final_dict.update(auth_dict)
 
128
        final_dict["password"] = password
 
129
        final_dict["method"] = method
 
130
        final_dict["uri"] = uri
 
131
        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
 
132
        HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
 
133
        HA2_str = "%(method)s:%(uri)s" % final_dict
 
134
        HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
 
135
        final_dict["HA1"] = HA1
 
136
        final_dict["HA2"] = HA2
 
137
        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
 
138
                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
 
139
        response = hashlib.md5(response_str.encode("ascii")).hexdigest()
 
140
 
 
141
        return response == auth_dict["response"]
 
142
 
 
143
    def _return_auth_challenge(self, request_handler):
 
144
        request_handler.send_response(407, "Proxy Authentication Required")
 
145
        request_handler.send_header("Content-Type", "text/html")
 
146
        request_handler.send_header(
 
147
            'Proxy-Authenticate', 'Digest realm="%s", '
 
148
            'qop="%s",'
 
149
            'nonce="%s", ' % \
 
150
            (self._realm_name, self._qop, self._generate_nonce()))
 
151
        # XXX: Not sure if we're supposed to add this next header or
 
152
        # not.
 
153
        #request_handler.send_header('Connection', 'close')
 
154
        request_handler.end_headers()
 
155
        request_handler.wfile.write(b"Proxy Authentication Required.")
 
156
        return False
 
157
 
 
158
    def handle_request(self, request_handler):
 
159
        """Performs digest authentication on the given HTTP request
 
160
        handler.  Returns True if authentication was successful, False
 
161
        otherwise.
 
162
 
 
163
        If no users have been set, then digest auth is effectively
 
164
        disabled and this method will always return True.
 
165
        """
 
166
 
 
167
        if len(self._users) == 0:
 
168
            return True
 
169
 
 
170
        if "Proxy-Authorization" not in request_handler.headers:
 
171
            return self._return_auth_challenge(request_handler)
 
172
        else:
 
173
            auth_dict = self._create_auth_dict(
 
174
                request_handler.headers["Proxy-Authorization"]
 
175
                )
 
176
            if auth_dict["username"] in self._users:
 
177
                password = self._users[ auth_dict["username"] ]
 
178
            else:
 
179
                return self._return_auth_challenge(request_handler)
 
180
            if not auth_dict.get("nonce") in self._nonces:
 
181
                return self._return_auth_challenge(request_handler)
 
182
            else:
 
183
                self._nonces.remove(auth_dict["nonce"])
 
184
 
 
185
            auth_validated = False
 
186
 
 
187
            # MSIE uses short_path in its validation, but Python's
 
188
            # urllib.request uses the full path, so we're going to see if
 
189
            # either of them works here.
 
190
 
 
191
            for path in [request_handler.path, request_handler.short_path]:
 
192
                if self._validate_auth(auth_dict,
 
193
                                       password,
 
194
                                       request_handler.command,
 
195
                                       path):
 
196
                    auth_validated = True
 
197
 
 
198
            if not auth_validated:
 
199
                return self._return_auth_challenge(request_handler)
 
200
            return True
 
201
 
 
202
# Proxy test infrastructure
 
203
 
 
204
class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
 
205
    """This is a 'fake proxy' that makes it look like the entire
 
206
    internet has gone down due to a sudden zombie invasion.  It main
 
207
    utility is in providing us with authentication support for
 
208
    testing.
 
209
    """
 
210
 
 
211
    def __init__(self, digest_auth_handler, *args, **kwargs):
 
212
        # This has to be set before calling our parent's __init__(), which will
 
213
        # try to call do_GET().
 
214
        self.digest_auth_handler = digest_auth_handler
 
215
        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
 
216
 
 
217
    def log_message(self, format, *args):
 
218
        # Uncomment the next line for debugging.
 
219
        # sys.stderr.write(format % args)
 
220
        pass
 
221
 
 
222
    def do_GET(self):
 
223
        (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
 
224
            self.path, "http")
 
225
        self.short_path = path
 
226
        if self.digest_auth_handler.handle_request(self):
 
227
            self.send_response(200, "OK")
 
228
            self.send_header("Content-Type", "text/html")
 
229
            self.end_headers()
 
230
            self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
 
231
                                   "ascii"))
 
232
            self.wfile.write(b"Our apologies, but our server is down due to "
 
233
                             b"a sudden zombie invasion.")
 
234
 
 
235
# Test cases
 
236
 
 
237
class ProxyAuthTests(unittest.TestCase):
 
238
    URL = "http://localhost"
 
239
 
 
240
    USER = "tester"
 
241
    PASSWD = "test123"
 
242
    REALM = "TestRealm"
 
243
 
 
244
    def setUp(self):
 
245
        super(ProxyAuthTests, self).setUp()
 
246
        self.digest_auth_handler = DigestAuthHandler()
 
247
        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
 
248
        self.digest_auth_handler.set_realm(self.REALM)
 
249
        def create_fake_proxy_handler(*args, **kwargs):
 
250
            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
 
251
 
 
252
        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
 
253
        self.server.start()
 
254
        self.server.ready.wait()
 
255
        proxy_url = "http://127.0.0.1:%d" % self.server.port
 
256
        handler = urllib.request.ProxyHandler({"http" : proxy_url})
 
257
        self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
 
258
        self.opener = urllib.request.build_opener(
 
259
            handler, self.proxy_digest_handler)
 
260
 
 
261
    def tearDown(self):
 
262
        self.server.stop()
 
263
        super(ProxyAuthTests, self).tearDown()
 
264
 
 
265
    def test_proxy_with_bad_password_raises_httperror(self):
 
266
        self.proxy_digest_handler.add_password(self.REALM, self.URL,
 
267
                                               self.USER, self.PASSWD+"bad")
 
268
        self.digest_auth_handler.set_qop("auth")
 
269
        self.assertRaises(urllib.error.HTTPError,
 
270
                          self.opener.open,
 
271
                          self.URL)
 
272
 
 
273
    def test_proxy_with_no_password_raises_httperror(self):
 
274
        self.digest_auth_handler.set_qop("auth")
 
275
        self.assertRaises(urllib.error.HTTPError,
 
276
                          self.opener.open,
 
277
                          self.URL)
 
278
 
 
279
    def test_proxy_qop_auth_works(self):
 
280
        self.proxy_digest_handler.add_password(self.REALM, self.URL,
 
281
                                               self.USER, self.PASSWD)
 
282
        self.digest_auth_handler.set_qop("auth")
 
283
        result = self.opener.open(self.URL)
 
284
        while result.read():
 
285
            pass
 
286
        result.close()
 
287
 
 
288
    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
 
289
        self.proxy_digest_handler.add_password(self.REALM, self.URL,
 
290
                                               self.USER, self.PASSWD)
 
291
        self.digest_auth_handler.set_qop("auth-int")
 
292
        try:
 
293
            result = self.opener.open(self.URL)
 
294
        except urllib.error.URLError:
 
295
            # It's okay if we don't support auth-int, but we certainly
 
296
            # shouldn't receive any kind of exception here other than
 
297
            # a URLError.
 
298
            result = None
 
299
        if result:
 
300
            while result.read():
 
301
                pass
 
302
            result.close()
 
303
 
 
304
 
 
305
def GetRequestHandler(responses):
 
306
 
 
307
    class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
 
308
 
 
309
        server_version = "TestHTTP/"
 
310
        requests = []
 
311
        headers_received = []
 
312
        port = 80
 
313
 
 
314
        def do_GET(self):
 
315
            body = self.send_head()
 
316
            while body:
 
317
                done = self.wfile.write(body)
 
318
                body = body[done:]
 
319
 
 
320
        def do_POST(self):
 
321
            content_length = self.headers["Content-Length"]
 
322
            post_data = self.rfile.read(int(content_length))
 
323
            self.do_GET()
 
324
            self.requests.append(post_data)
 
325
 
 
326
        def send_head(self):
 
327
            FakeHTTPRequestHandler.headers_received = self.headers
 
328
            self.requests.append(self.path)
 
329
            response_code, headers, body = responses.pop(0)
 
330
 
 
331
            self.send_response(response_code)
 
332
 
 
333
            for (header, value) in headers:
 
334
                self.send_header(header, value % {'port':self.port})
 
335
            if body:
 
336
                self.send_header("Content-type", "text/plain")
 
337
                self.end_headers()
 
338
                return body
 
339
            self.end_headers()
 
340
 
 
341
        def log_message(self, *args):
 
342
            pass
 
343
 
 
344
 
 
345
    return FakeHTTPRequestHandler
 
346
 
 
347
 
 
348
class TestUrlopen(unittest.TestCase):
 
349
    """Tests urllib.request.urlopen using the network.
 
350
 
 
351
    These tests are not exhaustive.  Assuming that testing using files does a
 
352
    good job overall of some of the basic interface features.  There are no
 
353
    tests exercising the optional 'data' and 'proxies' arguments.  No tests
 
354
    for transparent redirection have been written.
 
355
    """
 
356
 
 
357
    def setUp(self):
 
358
        super(TestUrlopen, self).setUp()
 
359
        # Ignore proxies for localhost tests.
 
360
        self.old_environ = os.environ.copy()
 
361
        os.environ['NO_PROXY'] = '*'
 
362
        self.server = None
 
363
 
 
364
    def tearDown(self):
 
365
        if self.server is not None:
 
366
            self.server.stop()
 
367
        os.environ.clear()
 
368
        os.environ.update(self.old_environ)
 
369
        super(TestUrlopen, self).tearDown()
 
370
 
 
371
    def urlopen(self, url, data=None, **kwargs):
 
372
        l = []
 
373
        f = urllib.request.urlopen(url, data, **kwargs)
 
374
        try:
 
375
            # Exercise various methods
 
376
            l.extend(f.readlines(200))
 
377
            l.append(f.readline())
 
378
            l.append(f.read(1024))
 
379
            l.append(f.read())
 
380
        finally:
 
381
            f.close()
 
382
        return b"".join(l)
 
383
 
 
384
    def start_server(self, responses=None):
 
385
        if responses is None:
 
386
            responses = [(200, [], b"we don't care")]
 
387
        handler = GetRequestHandler(responses)
 
388
 
 
389
        self.server = LoopbackHttpServerThread(handler)
 
390
        self.server.start()
 
391
        self.server.ready.wait()
 
392
        port = self.server.port
 
393
        handler.port = port
 
394
        return handler
 
395
 
 
396
    def start_https_server(self, responses=None, **kwargs):
 
397
        if not hasattr(urllib.request, 'HTTPSHandler'):
 
398
            self.skipTest('ssl support required')
 
399
        from test.ssl_servers import make_https_server
 
400
        if responses is None:
 
401
            responses = [(200, [], b"we care a bit")]
 
402
        handler = GetRequestHandler(responses)
 
403
        server = make_https_server(self, handler_class=handler, **kwargs)
 
404
        handler.port = server.port
 
405
        return handler
 
406
 
 
407
    def test_redirection(self):
 
408
        expected_response = b"We got here..."
 
409
        responses = [
 
410
            (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
 
411
             ""),
 
412
            (200, [], expected_response)
 
413
        ]
 
414
 
 
415
        handler = self.start_server(responses)
 
416
        data = self.urlopen("http://localhost:%s/" % handler.port)
 
417
        self.assertEqual(data, expected_response)
 
418
        self.assertEqual(handler.requests, ["/", "/somewhere_else"])
 
419
 
 
420
    def test_chunked(self):
 
421
        expected_response = b"hello world"
 
422
        chunked_start = (
 
423
                        b'a\r\n'
 
424
                        b'hello worl\r\n'
 
425
                        b'1\r\n'
 
426
                        b'd\r\n'
 
427
                        b'0\r\n'
 
428
                        )
 
429
        response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
 
430
        handler = self.start_server(response)
 
431
        data = self.urlopen("http://localhost:%s/" % handler.port)
 
432
        self.assertEqual(data, expected_response)
 
433
 
 
434
    def test_404(self):
 
435
        expected_response = b"Bad bad bad..."
 
436
        handler = self.start_server([(404, [], expected_response)])
 
437
 
 
438
        try:
 
439
            self.urlopen("http://localhost:%s/weeble" % handler.port)
 
440
        except urllib.error.URLError as f:
 
441
            data = f.read()
 
442
            f.close()
 
443
        else:
 
444
            self.fail("404 should raise URLError")
 
445
 
 
446
        self.assertEqual(data, expected_response)
 
447
        self.assertEqual(handler.requests, ["/weeble"])
 
448
 
 
449
    def test_200(self):
 
450
        expected_response = b"pycon 2008..."
 
451
        handler = self.start_server([(200, [], expected_response)])
 
452
        data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
 
453
        self.assertEqual(data, expected_response)
 
454
        self.assertEqual(handler.requests, ["/bizarre"])
 
455
 
 
456
    def test_200_with_parameters(self):
 
457
        expected_response = b"pycon 2008..."
 
458
        handler = self.start_server([(200, [], expected_response)])
 
459
        data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
 
460
                             b"get=with_feeling")
 
461
        self.assertEqual(data, expected_response)
 
462
        self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
 
463
 
 
464
    def test_https(self):
 
465
        handler = self.start_https_server()
 
466
        data = self.urlopen("https://localhost:%s/bizarre" % handler.port)
 
467
        self.assertEqual(data, b"we care a bit")
 
468
 
 
469
    def test_https_with_cafile(self):
 
470
        handler = self.start_https_server(certfile=CERT_localhost)
 
471
        import ssl
 
472
        # Good cert
 
473
        data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
 
474
                            cafile=CERT_localhost)
 
475
        self.assertEqual(data, b"we care a bit")
 
476
        # Bad cert
 
477
        with self.assertRaises(urllib.error.URLError) as cm:
 
478
            self.urlopen("https://localhost:%s/bizarre" % handler.port,
 
479
                         cafile=CERT_fakehostname)
 
480
        # Good cert, but mismatching hostname
 
481
        handler = self.start_https_server(certfile=CERT_fakehostname)
 
482
        with self.assertRaises(ssl.CertificateError) as cm:
 
483
            self.urlopen("https://localhost:%s/bizarre" % handler.port,
 
484
                         cafile=CERT_fakehostname)
 
485
 
 
486
    def test_https_with_cadefault(self):
 
487
        handler = self.start_https_server(certfile=CERT_localhost)
 
488
        # Self-signed cert should fail verification with system certificate store
 
489
        with self.assertRaises(urllib.error.URLError) as cm:
 
490
            self.urlopen("https://localhost:%s/bizarre" % handler.port,
 
491
                         cadefault=True)
 
492
 
 
493
    def test_https_sni(self):
 
494
        if ssl is None:
 
495
            self.skipTest("ssl module required")
 
496
        if not ssl.HAS_SNI:
 
497
            self.skipTest("SNI support required in OpenSSL")
 
498
        sni_name = None
 
499
        def cb_sni(ssl_sock, server_name, initial_context):
 
500
            nonlocal sni_name
 
501
            sni_name = server_name
 
502
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
 
503
        context.set_servername_callback(cb_sni)
 
504
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
 
505
        self.urlopen("https://localhost:%s" % handler.port)
 
506
        self.assertEqual(sni_name, "localhost")
 
507
 
 
508
    def test_sending_headers(self):
 
509
        handler = self.start_server()
 
510
        req = urllib.request.Request("http://localhost:%s/" % handler.port,
 
511
                                     headers={"Range": "bytes=20-39"})
 
512
        urllib.request.urlopen(req)
 
513
        self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
 
514
 
 
515
    def test_basic(self):
 
516
        handler = self.start_server()
 
517
        open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
 
518
        for attr in ("read", "close", "info", "geturl"):
 
519
            self.assertTrue(hasattr(open_url, attr), "object returned from "
 
520
                         "urlopen lacks the %s attribute" % attr)
 
521
        try:
 
522
            self.assertTrue(open_url.read(), "calling 'read' failed")
 
523
        finally:
 
524
            open_url.close()
 
525
 
 
526
    def test_info(self):
 
527
        handler = self.start_server()
 
528
        try:
 
529
            open_url = urllib.request.urlopen(
 
530
                "http://localhost:%s" % handler.port)
 
531
            info_obj = open_url.info()
 
532
            self.assertIsInstance(info_obj, email.message.Message,
 
533
                                  "object returned by 'info' is not an "
 
534
                                  "instance of email.message.Message")
 
535
            self.assertEqual(info_obj.get_content_subtype(), "plain")
 
536
        finally:
 
537
            self.server.stop()
 
538
 
 
539
    def test_geturl(self):
 
540
        # Make sure same URL as opened is returned by geturl.
 
541
        handler = self.start_server()
 
542
        open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
 
543
        url = open_url.geturl()
 
544
        self.assertEqual(url, "http://localhost:%s" % handler.port)
 
545
 
 
546
    def test_bad_address(self):
 
547
        # Make sure proper exception is raised when connecting to a bogus
 
548
        # address.
 
549
 
 
550
        # as indicated by the comment below, this might fail with some ISP,
 
551
        # so we run the test only when -unetwork/-uall is specified to
 
552
        # mitigate the problem a bit (see #17564)
 
553
        support.requires('network')
 
554
        self.assertRaises(OSError,
 
555
                          # Given that both VeriSign and various ISPs have in
 
556
                          # the past or are presently hijacking various invalid
 
557
                          # domain name requests in an attempt to boost traffic
 
558
                          # to their own sites, finding a domain name to use
 
559
                          # for this test is difficult.  RFC2606 leads one to
 
560
                          # believe that '.invalid' should work, but experience
 
561
                          # seemed to indicate otherwise.  Single character
 
562
                          # TLDs are likely to remain invalid, so this seems to
 
563
                          # be the best choice. The trailing '.' prevents a
 
564
                          # related problem: The normal DNS resolver appends
 
565
                          # the domain names from the search path if there is
 
566
                          # no '.' the end and, and if one of those domains
 
567
                          # implements a '*' rule a result is returned.
 
568
                          # However, none of this will prevent the test from
 
569
                          # failing if the ISP hijacks all invalid domain
 
570
                          # requests.  The real solution would be to be able to
 
571
                          # parameterize the framework with a mock resolver.
 
572
                          urllib.request.urlopen,
 
573
                          "http://sadflkjsasf.i.nvali.d./")
 
574
 
 
575
    def test_iteration(self):
 
576
        expected_response = b"pycon 2008..."
 
577
        handler = self.start_server([(200, [], expected_response)])
 
578
        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
 
579
        for line in data:
 
580
            self.assertEqual(line, expected_response)
 
581
 
 
582
    def test_line_iteration(self):
 
583
        lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
 
584
        expected_response = b"".join(lines)
 
585
        handler = self.start_server([(200, [], expected_response)])
 
586
        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
 
587
        for index, line in enumerate(data):
 
588
            self.assertEqual(line, lines[index],
 
589
                             "Fetched line number %s doesn't match expected:\n"
 
590
                             "    Expected length was %s, got %s" %
 
591
                             (index, len(lines[index]), len(line)))
 
592
        self.assertEqual(index + 1, len(lines))
 
593
 
 
594
 
 
595
@support.reap_threads
 
596
def test_main():
 
597
    support.run_unittest(ProxyAuthTests, TestUrlopen)
 
598
 
 
599
if __name__ == "__main__":
 
600
    test_main()