1
"""Tests for ClientCookie._urllib2_support."""
4
# Request (I'm too lazy)
5
# CacheFTPHandler (hard to write)
6
# parse_keqv_list, parse_http_list (I'm leaving this for Anthony Baxter
7
# and Greg Stein, since they're doing Digest Authentication)
8
# Authentication stuff (ditto)
9
# ProxyHandler, CustomProxy, CustomProxyHandler (I don't use a proxy)
10
# GopherHandler (haven't used gopher for a decade or so...)
12
import unittest, StringIO, os, sys, UserDict
15
from ClientCookie._urllib2_support import Request, AbstractHTTPHandler, \
16
build_opener, parse_head, urlopen
17
from ClientCookie._Util import startswith
18
from ClientCookie import HTTPRedirectHandler, HTTPRequestUpgradeProcessor, \
19
HTTPEquivProcessor, HTTPRefreshProcessor, SeekableProcessor, \
20
HTTPCookieProcessor, HTTPRefererProcessor, HTTPRobotRulesProcessor, \
21
HTTPErrorProcessor, HTTPHandler
22
from ClientCookie import OpenerDirector
31
def open(self, req, data=None):
32
self.req, self.data = req, data
33
def error(self, proto, *args):
34
self.proto, self.args = proto, args
37
def read(self, count=None): pass
38
def readline(self, count=None): pass
41
class MockHeaders(UserDict.UserDict):
42
def getallmatchingheaders(self, name):
44
for k, v in self.data.items():
45
r.append("%s: %s" % (k, v))
48
class MockResponse(StringIO.StringIO):
49
def __init__(self, code, msg, headers, data, url=None):
50
StringIO.StringIO.__init__(self, data)
51
self.code, self.msg, self.headers, self.url = code, msg, headers, url
58
def add_cookie_header(self, request, unverifiable=False):
59
self.ach_req, self.ach_u = request, unverifiable
60
def extract_cookies(self, response, request, unverifiable=False):
61
self.ec_req, self.ec_r, self.ec_u = request, response, unverifiable
64
def __init__(self, meth_name, action, handle):
65
self.meth_name = meth_name
68
def __call__(self, *args):
69
return apply(self.handle, (self.meth_name, self.action)+args)
73
def __init__(self, methods):
74
self._define_methods(methods)
75
def _define_methods(self, methods):
77
if len(spec) == 2: name, action = spec
78
else: name, action = spec, None
79
meth = MockMethod(name, action, self.handle)
80
setattr(self.__class__, name, meth)
81
def handle(self, fn_name, action, *args, **kwds):
82
self.parent.calls.append((self, fn_name, args, kwds))
85
elif action == "return self":
87
elif action == "return response":
88
res = MockResponse(200, "OK", {}, "")
90
elif action == "return request":
91
return Request("http://blah/")
92
elif startswith(action, "error"):
93
code = int(action[-3:])
94
res = MockResponse(200, "OK", {}, "")
95
return self.parent.error("http", args[0], res, code, "", {})
96
elif action == "raise":
97
raise urllib2.URLError("blah")
100
def add_parent(self, parent):
102
self.parent.calls = []
103
def __cmp__(self, other):
104
if hasattr(other, "handler_order"):
105
return cmp(self.handler_order, other.handler_order)
106
# No handler_order, leave in original order. Yuck.
108
#return cmp(id(self), id(other))
111
def add_ordered_mock_handlers(opener, meth_spec):
114
for meths in meth_spec:
115
class MockHandlerSubclass(MockHandler): pass
116
h = MockHandlerSubclass(meths)
117
h.handler_order = h.processor_order = count
121
opener.add_handler(h)
124
class OpenerDirectorTests(unittest.TestCase):
126
def test_handled(self):
127
# handler returning non-None means no more handlers will be called
130
["http_open", "ftp_open", "http_error_302"],
132
[("http_open", "return self")],
133
[("http_open", "return self")],
135
handlers = add_ordered_mock_handlers(o, meth_spec)
137
req = Request("http://example.com/")
139
# Second http_open gets called, third doesn't, since second returned
140
# non-None. Handlers without http_open never get any methods called
142
# In fact, second mock handler returns self (instead of response),
143
# which becomes the OpenerDirector's return value.
144
self.assert_(r == handlers[2])
145
calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
146
for i in range(len(o.calls)):
147
handler, name, args, kwds = o.calls[i]
148
self.assert_((handler, name) == calls[i])
149
self.assert_(args == (req,))
151
def test_handler_order(self):
152
from ClientCookie._urllib2_support import methnames
155
for meths, handler_order in [
156
([("http_open", "return self")], 500),
159
class MockHandlerSubclass(MockHandler): pass
160
h = MockHandlerSubclass(meths)
161
h.handler_order = handler_order
165
r = o.open("http://example.com/")
166
# handlers called in reverse order, thanks to their sort order
167
self.assert_(o.calls[0][0] == handlers[1])
168
self.assert_(o.calls[1][0] == handlers[0])
170
def test_raise(self):
171
# raising URLError stops processing of request
174
[("http_open", "raise")],
175
[("http_open", "return self")],
177
handlers = add_ordered_mock_handlers(o, meth_spec)
179
req = Request("http://example.com/")
180
self.assertRaises(urllib2.URLError, o.open, req)
181
self.assert_(o.calls == [(handlers[0], "http_open", (req,), {})])
183
## def test_error(self):
184
## # XXX this doesn't actually seem to be used in standard library,
185
## # but should really be tested anyway...
187
def test_http_error(self):
188
# XXX http_error_default
189
# http errors are a special case
192
[("http_open", "error 302")],
193
[("http_error_400", "raise"), "http_open"],
194
[("http_error_302", "return response"), "http_error_303",
196
[("http_error_302")],
198
handlers = add_ordered_mock_handlers(o, meth_spec)
202
req = Request("http://example.com/")
204
assert len(o.calls) == 2
205
calls = [(handlers[0], "http_open", (req,)),
206
(handlers[2], "http_error_302", (req, Unknown, 302, "", {}))]
207
for i in range(len(calls)):
208
handler, method_name, args, kwds = o.calls[i]
209
self.assert_((handler, method_name) == calls[i][:2])
210
# check handler methods were called with expected arguments
211
expected_args = calls[i][2]
212
for j in range(len(args)):
213
if expected_args[j] is not Unknown:
214
self.assert_(args[j] == expected_args[j])
216
def test_processors(self):
217
# *_request / *_response methods get called appropriately
220
[("http_request", "return request"),
221
("http_response", "return response")],
222
[("http_request", "return request"),
223
("http_response", "return response")],
225
handlers = add_ordered_mock_handlers(o, meth_spec)
227
req = Request("http://example.com/")
229
# processor methods are called on *all* handlers that define them,
230
# not just the first handler
231
calls = [(handlers[0], "http_request"), (handlers[1], "http_request"),
232
(handlers[0], "http_response"), (handlers[1], "http_response")]
234
for i in range(len(o.calls)):
235
handler, name, args, kwds = o.calls[i]
238
self.assert_((handler, name) == calls[i])
239
self.assert_(len(args) == 1)
240
self.assert_(isinstance(args[0], Request))
243
self.assert_((handler, name) == calls[i])
244
self.assert_(len(args) == 2)
245
self.assert_(isinstance(args[0], Request))
246
# response from opener.open is None, because there's no
247
# handler that defines http_open to handle it
248
self.assert_(args[1] is None or
249
isinstance(args[1], MockResponse))
254
self.req_headers = []
256
self.raise_on_endheaders = False
257
def __call__(self, host):
260
def set_debuglevel(self, level): self.level = level
261
def putrequest(self, method, selector):
262
self.method, self.selector = method, selector
263
def putheader(self, key, value):
264
self.req_headers.append((key, value))
265
def endheaders(self):
266
if self.raise_on_endheaders:
269
def send(self, data): self.data = data
270
def getreply(self): return 200, "OK", {}
271
def getfile(self): return MockFile()
274
class MockFTPWrapper:
275
def __init__(self, data): self.data = data
276
def retrfile(self, filename, filetype):
277
self.filename, self.filetype = filename, filetype
278
return StringIO.StringIO(self.data), len(self.data)
280
class NullFTPHandler(urllib2.FTPHandler):
281
def __init__(self, data): self.data = data
282
def connect_ftp(self, user, passwd, host, port, dirs):
283
self.user, self.passwd = user, passwd
284
self.host, self.port = host, port
286
self.ftpwrapper = MockFTPWrapper(self.data)
287
return self.ftpwrapper
289
def sanepathname2url(path):
291
urlpath = urllib.pathname2url(path)
292
if os.name == "nt" and urlpath.startswith("///"):
293
urlpath = urlpath[2:]
294
# XXX don't ask me about the mac...
297
class MockRobotFileParserClass:
300
self._can_fetch = True
304
self.calls.append("__call__")
306
def set_url(self, url):
307
self.calls.append(("set_url", url))
309
self.calls.append("read")
310
def can_fetch(self, ua, url):
311
self.calls.append(("can_fetch", ua, url))
312
return self._can_fetch
314
class HandlerTests(unittest.TestCase):
316
if hasattr(sys, "version_info") and sys.version_info > (2, 1, 3, "final", 0):
319
import ftplib, socket
320
data = "rheum rhaponicum"
321
h = NullFTPHandler(data)
322
o = h.parent = MockOpener()
324
for url, host, port, type_, dirs, filename, mimetype in [
325
("ftp://localhost/foo/bar/baz.html",
326
"localhost", ftplib.FTP_PORT, "I",
327
["foo", "bar"], "baz.html", "text/html"),
328
# XXXX Bug: FTPHandler tries to gethostbyname "localhost:80",
329
# with the port still there.
330
#("ftp://localhost:80/foo/bar/",
331
# "localhost", 80, "D",
332
# ["foo", "bar"], "", None),
333
# XXXX bug: second use of splitattr() in FTPHandler should be
335
#("ftp://localhost/baz.gif;type=a",
336
# "localhost", ftplib.FTP_PORT, "A",
337
# [], "baz.gif", "image/gif"),
339
r = h.ftp_open(Request(url))
340
# ftp authentication not yet implemented by FTPHandler
341
self.assert_(h.user == h.passwd == "")
342
self.assert_(h.host == socket.gethostbyname(host))
343
self.assert_(h.port == port)
344
self.assert_(h.dirs == dirs)
345
self.assert_(h.ftpwrapper.filename == filename)
346
self.assert_(h.ftpwrapper.filetype == type_)
348
self.assert_(headers["Content-type"] == mimetype)
349
self.assert_(int(headers["Content-length"]) == len(data))
352
import time, rfc822, socket
353
h = urllib2.FileHandler()
354
o = h.parent = MockOpener()
356
#TESTFN = test_support.TESTFN
358
urlpath = sanepathname2url(os.path.abspath(TESTFN))
359
towrite = "hello, world\n"
361
"file://localhost%s" % urlpath,
362
"file://%s" % urlpath,
363
"file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
364
"file://%s%s" % (socket.gethostbyname(socket.gethostname()),
367
f = open(TESTFN, "wb")
374
r = h.file_open(Request(url))
381
stats = os.stat(TESTFN)
382
modified = rfc822.formatdate(stats.st_mtime)
385
self.assertEqual(data, towrite)
386
self.assertEqual(headers["Content-type"], "text/plain")
387
self.assertEqual(headers["Content-length"], "13")
388
self.assertEqual(headers["Last-modified"], modified)
391
"file://localhost:80%s" % urlpath,
392
# XXXX bug: these fail with socket.gaierror, should be URLError
393
## "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
394
## os.getcwd(), TESTFN),
395
## "file://somerandomhost.ontheinternet.com%s/%s" %
396
## (os.getcwd(), TESTFN),
399
f = open(TESTFN, "wb")
405
self.assertRaises(urllib2.URLError,
406
h.file_open, Request(url))
410
h = urllib2.FileHandler()
411
o = h.parent = MockOpener()
412
# XXXX why does // mean ftp (and /// mean not ftp!), and where
413
# is file: scheme specified? I think this is really a bug, and
414
# what was intended was to distinguish between URLs like:
415
# file:/blah.txt (a file)
416
# file://localhost/blah.txt (a file)
417
# file:///blah.txt (a file)
418
# file://ftp.example.com/blah.txt (an ftp URL)
420
("file://ftp.example.com//foo.txt", True),
421
("file://ftp.example.com///foo.txt", False),
422
# XXXX bug: fails with OSError, should be URLError
423
("file://ftp.example.com/foo.txt", False),
428
# XXXX remove OSError when bug fixed
429
except (urllib2.URLError, OSError):
430
self.assert_(not ftp)
432
self.assert_(o.req is req)
433
self.assertEqual(req.type, "ftp")
435
## def test_file(self):
436
## import time, rfc822, socket
437
## h = urllib2.FileHandler()
438
## o = h.parent = MockOpener()
440
## #from test_support import TESTFN
441
## TESTFN = "test.txt"
442
## towrite = "hello, world\n"
444
## "file://localhost%s/%s" % (os.getcwd(), TESTFN),
445
## "file://%s/%s" % (os.getcwd(), TESTFN),
446
## "file://%s%s/%s" % (socket.gethostbyname('localhost'),
447
## os.getcwd(), TESTFN),
448
## "file://%s%s/%s" % (socket.gethostbyname(socket.gethostname()),
449
## os.getcwd(), TESTFN),
450
## # XXX Windows / Mac format(s), ... ?
452
## create_time = time.time()
453
## f = open(TESTFN, "w")
460
## r = h.file_open(Request(url))
463
## headers = r.info()
464
## newurl = r.geturl()
467
## stats = os.stat(TESTFN)
468
## modified = rfc822.formatdate(stats.st_mtime)
471
## self.assert_(data == towrite)
472
## self.assert_(headers["Content-type"] == "text/plain")
473
## self.assert_(headers["Content-length"] == "13")
474
## ## # Fudge Last-modified string comparison by one second to
475
## ## # prevent spurious failure on crossing a second boundary while
476
## ## # executing this test.
477
## ## # XXXX This test still fails occasionally! Why?
478
## ## unfudged = rfc822.formatdate(create_time)
479
## ## fudged = rfc822.formatdate(create_time+1)
480
## ## self.assert_(headers["Last-modified"] in [unfudged, fudged])
481
## self.assert_(headers["Last-modified"] == modified)
484
## "file://localhost:80%s/%s" % (os.getcwd(), TESTFN),
485
## # XXXX bug: these fail with socket.gaierror, should be URLError
486
## #"file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
487
## # os.getcwd(), TESTFN),
488
## #"file://somerandomhost.ontheinternet.com%s/%s" %
489
## #(os.getcwd(), TESTFN),
492
## f = open(TESTFN, "w")
498
## self.assertRaises(urllib2.URLError,
499
## h.file_open, Request(url))
503
## h = urllib2.FileHandler()
504
## o = h.parent = MockOpener()
505
## # XXXX why does // mean ftp (and /// mean not ftp!), and where
506
## # is file: scheme specified? I think this is really a bug, and
507
## # what was intended was to distinguish between URLs like:
508
## # file:/blah.txt (a file)
509
## # file://localhost/blah.txt (a file)
510
## # file:///blah.txt (a file)
511
## # file://ftp.example.com/blah.txt (an ftp URL)
513
## ("file://ftp.example.com//foo.txt", True),
514
## ("file://ftp.example.com///foo.txt", False),
515
## # XXXX bug: fails with OSError, should be URLError
516
## ("file://ftp.example.com/foo.txt", False),
518
## req = Request(url)
521
## except (urllib2.URLError, OSError): # XXXX remove OSError
522
## self.assert_(not ftp)
524
## self.assert_(o.req is req)
525
## self.assert_(req.type == "ftp")
528
h = AbstractHTTPHandler()
529
o = h.parent = MockOpener()
531
url = "http://example.com/"
532
for method, data in [("GET", None), ("POST", "blah")]:
533
req = Request(url, data, {"Foo": "bar"})
534
req.add_unredirected_header("Spam", "eggs")
535
http = MockHTTPClass()
536
r = h.do_open(http, req)
539
r.read; r.readline # wrapped MockFile methods
540
r.info; r.geturl # addinfourl methods
541
r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply()
543
hdrs.get; hdrs.has_key # r.info() gives dict from .getreply()
544
self.assert_(r.geturl() == url)
546
self.assert_(http.host == "example.com")
547
self.assert_(http.level == 0)
548
self.assert_(http.method == method)
549
self.assert_(http.selector == "/")
550
http.req_headers.sort()
551
self.assert_(http.req_headers == [("Foo", "bar"), ("Spam", "eggs")])
552
self.assert_(http.data == data)
554
# check socket.error converted to URLError
555
http.raise_on_endheaders = True
556
self.assertRaises(urllib2.URLError, h.do_open, http, req)
558
# check adding of standard headers
559
o.addheaders = [("Spam", "eggs")]
560
for data in "", None: # POST, GET
561
req = Request("http://example.com/", data)
562
r = MockResponse(200, "OK", {}, "")
563
newreq = h.do_request_(req)
564
if data is None: # GET
565
self.assert_(not req.unredirected_hdrs.has_key("Content-length"))
566
self.assert_(not req.unredirected_hdrs.has_key("Content-type"))
568
self.assert_(req.unredirected_hdrs["Content-length"] == "0")
569
self.assert_(req.unredirected_hdrs["Content-type"] ==
570
"application/x-www-form-urlencoded")
571
# XXX the details of Host could be better tested
572
self.assert_(req.unredirected_hdrs["Host"] == "example.com")
573
self.assert_(req.unredirected_hdrs["Spam"] == "eggs")
575
# don't clobber existing headers
576
req.add_unredirected_header("Content-length", "foo")
577
req.add_unredirected_header("Content-type", "bar")
578
req.add_unredirected_header("Host", "baz")
579
req.add_unredirected_header("Spam", "foo")
580
newreq = h.do_request_(req)
581
self.assert_(req.unredirected_hdrs["Content-length"] == "foo")
582
self.assert_(req.unredirected_hdrs["Content-type"] == "bar")
583
self.assert_(req.unredirected_hdrs["Host"] == "baz")
584
self.assert_(req.unredirected_hdrs["Spam"] == "foo")
586
def test_request_upgrade(self):
587
h = HTTPRequestUpgradeProcessor()
588
o = h.parent = MockOpener()
590
# urllib2.Request gets upgraded
591
req = urllib2.Request("http://example.com/")
592
newreq = h.http_request(req)
593
self.assert_(newreq is not req)
594
self.assert_(newreq.__class__ is Request)
595
# ClientCookie._urllib2_support.Request doesn't
596
req = Request("http://example.com/")
597
newreq = h.http_request(req)
598
self.assert_(newreq is req)
599
self.assert_(newreq.__class__ is Request)
601
def test_referer(self):
602
h = HTTPRefererProcessor()
603
o = h.parent = MockOpener()
606
url = "http://example.com/"
608
r = MockResponse(200, "OK", {}, "", url)
609
newr = h.http_response(req, r)
610
self.assert_(r is newr)
611
self.assert_(h.referer == url)
612
newreq = h.http_request(req)
613
self.assert_(req is newreq)
614
self.assert_(req.unredirected_hdrs["Referer"] == url)
615
# don't clobber existing Referer
616
ref = "http://set.by.user.com/"
617
req.add_unredirected_header("Referer", ref)
618
newreq = h.http_request(req)
619
self.assert_(req is newreq)
620
self.assert_(req.unredirected_hdrs["Referer"] == ref)
622
def test_errors(self):
623
h = HTTPErrorProcessor()
624
o = h.parent = MockOpener()
626
url = "http://example.com/"
628
# 200 OK is passed through
629
r = MockResponse(200, "OK", {}, "", url)
630
newr = h.http_response(req, r)
631
self.assert_(r is newr)
632
self.assert_(not hasattr(o, "proto")) # o.error not called
633
# anything else calls o.error (and MockOpener returns None, here)
634
r = MockResponse(201, "Created", {}, "", url)
635
self.assert_(h.http_response(req, r) is None)
636
self.assert_(o.proto == "http") # o.error called
637
self.assert_(o.args == (req, r, 201, "Created", {}))
639
def test_robots(self):
641
rfpc = MockRobotFileParserClass()
642
h = HTTPRobotRulesProcessor(rfpc)
644
url = "http://example.com:80/foo/bar.html"
646
# first time: initialise and set up robots.txt parser before checking
647
# whether OK to fetch URL
649
self.assert_(rfpc.calls == [
651
("set_url", "http://example.com:80/robots.txt"),
653
("can_fetch", "", url),
655
# second time: just use existing parser
659
self.assert_(rfpc.calls == [
660
("can_fetch", "", url),
662
# different URL on same server: same again
664
url = "http://example.com:80/blah.html"
667
self.assert_(rfpc.calls == [
668
("can_fetch", "", url),
672
rfpc._can_fetch = False
673
url = "http://example.com:80/rhubarb.html"
677
except urllib2.HTTPError, e:
678
self.assert_(e.request == req)
679
self.assert_(e.code == 403)
680
# new host: reload robots.txt (even though the host and port are
681
# unchanged, we treat this as a new host because
682
# "example.com" != "example.com:80")
684
rfpc._can_fetch = True
685
url = "http://example.com/rhubarb.html"
688
self.assert_(rfpc.calls == [
690
("set_url", "http://example.com/robots.txt"),
692
("can_fetch", "", url),
695
def test_cookies(self):
697
h = HTTPCookieProcessor(cj)
698
o = h.parent = MockOpener()
700
req = urllib2.Request("http://example.com/")
701
r = MockResponse(200, "OK", {}, "")
702
newreq = h.http_request(req)
703
self.assert_(cj.ach_req is req is newreq)
704
self.assert_(req.origin_req_host == "example.com")
705
self.assert_(cj.ach_u == False)
706
newr = h.http_response(req, r)
707
self.assert_(cj.ec_req is req)
708
self.assert_(cj.ec_r is r is newr)
709
self.assert_(cj.ec_u == False)
711
def test_seekable(self):
712
h = SeekableProcessor()
713
o = h.parent = MockOpener()
715
req = urllib2.Request("http://example.com/")
716
class MockUnseekableResponse: pass
717
r = MockUnseekableResponse()
718
newr = h.http_response(req, r)
719
self.assert_(not hasattr(r, "seek"))
720
self.assert_(hasattr(newr, "seek"))
722
def test_http_equiv(self):
723
h = HTTPEquivProcessor()
724
o = h.parent = MockOpener()
726
req = Request("http://example.com/")
727
r = MockResponse(200, "OK", {"Foo": "Bar"},
729
'<meta http-equiv="Refresh" content="spam">'
731
newr = h.http_response(req, r)
732
headers = newr.info()
733
self.assert_(headers["Refresh"] == "spam")
734
self.assert_(headers["Foo"] == "Bar")
736
def test_refresh(self):
737
# XXX processor constructor optional args
738
h = HTTPRefreshProcessor()
739
o = h.parent = MockOpener()
741
req = Request("http://example.com/")
742
headers = MockHeaders({"refresh": '0; url="http://example.com/foo/"'})
743
r = MockResponse(200, "OK", headers, "")
744
newr = h.http_response(req, r)
745
self.assert_(o.proto == "http")
746
self.assert_(o.args == (req, r, "refresh", "OK", headers))
748
def test_redirect(self):
749
from_url = "http://example.com/a.html"
750
to_url = "http://example.com/b.html"
751
h = HTTPRedirectHandler()
752
o = h.parent = MockOpener()
754
# ordinary redirect behaviour
755
for code in 301, 302, 303, 307, "refresh":
756
for data in None, "blah\nblah\n":
757
method = getattr(h, "http_error_%s" % code)
758
req = Request(from_url, data)
759
req.add_header("Nonsense", "viking=withhold")
760
req.add_unredirected_header("Spam", "spam")
761
req.origin_req_host = "example.com" # XXX
763
method(req, MockFile(), code, "Blah",
764
MockHeaders({"location": to_url}))
765
except urllib2.HTTPError:
766
# 307 in response to POST requires user OK
767
self.assert_(code == 307 and data is not None)
768
self.assert_(o.req.get_full_url() == to_url)
770
self.assert_(o.req.get_method() == "GET")
771
except AttributeError:
772
self.assert_(not o.req.has_data())
773
self.assert_(o.req.headers["Nonsense"] == "viking=withhold")
774
self.assert_(not o.req.headers.has_key("Spam"))
775
self.assert_(not o.req.unredirected_hdrs.has_key("Spam"))
778
def redirect(h, req, url=to_url):
779
h.http_error_302(req, MockFile(), 302, "Blah",
780
MockHeaders({"location": url}))
781
# Note that the *original* request shares the same record of
782
# redirections with the sub-requests caused by the redirections.
784
# detect infinite loop redirect of a URL to itself
785
req = Request(from_url)
786
req.origin_req_host = "example.com"
790
redirect(h, req, "http://example.com/")
792
except urllib2.HTTPError:
793
# don't stop until max_repeats, because cookies may introduce state
794
self.assert_(count == HTTPRedirectHandler.max_repeats)
796
# detect endless non-repeating chain of redirects
797
req = Request(from_url)
798
req.origin_req_host = "example.com"
802
redirect(h, req, "http://example.com/%d" % count)
804
except urllib2.HTTPError:
805
self.assert_(count == HTTPRedirectHandler.max_redirections)
808
class HeadParserTests(unittest.TestCase):
813
("""<meta http-equiv="refresh" content="1; http://example.com/">
815
[("refresh", "1; http://example.com/")]
819
<meta http-equiv="refresh" content="1; http://example.com/">
820
<meta name="spam" content="eggs">
821
<meta http-equiv="foo" content="bar">
822
<p> <!-- p is not allowed in head, so parsing should stop here-->
823
<meta http-equiv="moo" content="cow">
826
[("refresh", "1; http://example.com/"), ("foo", "bar")])
828
for html, result in htmls:
829
self.assert_(parse_head(StringIO.StringIO(html)) == result)
832
class MyHTTPHandler(HTTPHandler): pass
833
class FooHandler(urllib2.BaseHandler):
834
def foo_open(self): pass
835
class BarHandler(urllib2.BaseHandler):
836
def bar_open(self): pass
849
class FunctionTests(unittest.TestCase):
851
def test_build_opener(self):
852
o = build_opener(FooHandler, BarHandler)
853
self.opener_has_handler(o, FooHandler)
854
self.opener_has_handler(o, BarHandler)
856
# can take a mix of classes and instances
857
o = build_opener(FooHandler, BarHandler())
858
self.opener_has_handler(o, FooHandler)
859
self.opener_has_handler(o, BarHandler)
861
# subclasses of default handlers override default handlers
862
o = build_opener(MyHTTPHandler)
863
self.opener_has_handler(o, MyHTTPHandler)
865
# a particular case of overriding: default handlers can be passed
868
self.opener_has_handler(o, HTTPHandler)
869
o = build_opener(HTTPHandler)
870
self.opener_has_handler(o, HTTPHandler)
871
o = build_opener(HTTPHandler())
872
self.opener_has_handler(o, HTTPHandler)
874
def opener_has_handler(self, opener, handler_class):
875
for h in opener.handlers:
876
if h.__class__ == handler_class:
881
def _methnames(self, *objs):
882
from ClientCookie._urllib2_support import methnames
884
for i in range(len(objs)):
886
names = methnames(obj)
888
# special methods vary over Python versions
889
names = filter(lambda mn: mn[0:2] != "__" , names)
893
def test_methnames(self):
894
a, b, c, d = A(), B(), C(), D()
895
a, b, c, d = self._methnames(a, b, c, d)
896
self.assert_(a == ["a"])
897
self.assert_(b == ["a", "b"])
898
self.assert_(c == ["a", "c"])
899
self.assert_(d == ["a", "b", "c", "d"])
901
a, b, c, d = A(), B(), C(), D()
902
a.x = lambda self: None
903
b.y = lambda self: None
904
d.z = lambda self: None
905
a, b, c, d = self._methnames(a, b, c, d)
906
self.assert_(a == ["a", "x"])
907
self.assert_(b == ["a", "b", "y"])
908
self.assert_(c == ["a", "c"])
909
self.assert_(d == ["a", "b", "c", "d", "z"])
912
if __name__ == "__main__":