~vcs-imports/clientcookie/trunk

« back to all changes in this revision

Viewing changes to test/test_urllib2.py

  • Committer: jjlee
  • Date: 2004-03-31 19:36:11 UTC
  • Revision ID: svn-v4:fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada:user/jjlee/wwwsearch/ClientCookie:3571
Initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Tests for ClientCookie._urllib2_support."""
 
2
 
 
3
# XXX
 
4
# Request (I'm too lazy)
 
5
# CacheFTPHandler (hard to write)
 
6
# parse_keqv_list, parse_http_list (I'm leaving this for Anthony Baxter
 
7
#  and Greg Stein, since they're doing Digest Authentication)
 
8
# Authentication stuff (ditto)
 
9
# ProxyHandler, CustomProxy, CustomProxyHandler (I don't use a proxy)
 
10
# GopherHandler (haven't used gopher for a decade or so...)
 
11
 
 
12
import unittest, StringIO, os, sys, UserDict
 
13
 
 
14
import urllib2
 
15
from ClientCookie._urllib2_support import Request, AbstractHTTPHandler, \
 
16
     build_opener, parse_head, urlopen
 
17
from ClientCookie._Util import startswith
 
18
from ClientCookie import HTTPRedirectHandler, HTTPRequestUpgradeProcessor, \
 
19
     HTTPEquivProcessor, HTTPRefreshProcessor, SeekableProcessor, \
 
20
     HTTPCookieProcessor, HTTPRefererProcessor, HTTPRobotRulesProcessor, \
 
21
     HTTPErrorProcessor, HTTPHandler
 
22
from ClientCookie import OpenerDirector
 
23
 
 
24
try: True
 
25
except NameError:
 
26
    True = 1
 
27
    False = 0
 
28
 
 
29
class MockOpener:
 
30
    addheaders = []
 
31
    def open(self, req, data=None):
 
32
        self.req, self.data = req, data
 
33
    def error(self, proto, *args):
 
34
        self.proto, self.args = proto, args
 
35
 
 
36
class MockFile:
 
37
    def read(self, count=None): pass
 
38
    def readline(self, count=None): pass
 
39
    def close(self): pass
 
40
 
 
41
class MockHeaders(UserDict.UserDict):
 
42
    def getallmatchingheaders(self, name):
 
43
        r = []
 
44
        for k, v in self.data.items():
 
45
            r.append("%s: %s" % (k, v))
 
46
        return r
 
47
 
 
48
class MockResponse(StringIO.StringIO):
 
49
    def __init__(self, code, msg, headers, data, url=None):
 
50
        StringIO.StringIO.__init__(self, data)
 
51
        self.code, self.msg, self.headers, self.url = code, msg, headers, url
 
52
    def info(self):
 
53
        return self.headers
 
54
    def geturl(self):
 
55
        return self.url
 
56
 
 
57
class MockCookieJar:
 
58
    def add_cookie_header(self, request, unverifiable=False):
 
59
        self.ach_req, self.ach_u = request, unverifiable
 
60
    def extract_cookies(self, response, request, unverifiable=False):
 
61
        self.ec_req, self.ec_r, self.ec_u = request, response, unverifiable
 
62
 
 
63
class MockMethod:
 
64
    def __init__(self, meth_name, action, handle):
 
65
        self.meth_name = meth_name
 
66
        self.handle = handle
 
67
        self.action = action
 
68
    def __call__(self, *args):
 
69
        return apply(self.handle, (self.meth_name, self.action)+args)
 
70
 
 
71
class MockHandler:
 
72
    processor_order = 500
 
73
    def __init__(self, methods):
 
74
        self._define_methods(methods)
 
75
    def _define_methods(self, methods):
 
76
        for spec in methods:
 
77
            if len(spec) == 2: name, action = spec
 
78
            else: name, action = spec, None
 
79
            meth = MockMethod(name, action, self.handle)
 
80
            setattr(self.__class__, name, meth)
 
81
    def handle(self, fn_name, action, *args, **kwds):
 
82
        self.parent.calls.append((self, fn_name, args, kwds))
 
83
        if action is None:
 
84
            return None
 
85
        elif action == "return self":
 
86
            return self
 
87
        elif action == "return response":
 
88
            res = MockResponse(200, "OK", {}, "")
 
89
            return res
 
90
        elif action == "return request":
 
91
            return Request("http://blah/")
 
92
        elif startswith(action, "error"):
 
93
            code = int(action[-3:])
 
94
            res = MockResponse(200, "OK", {}, "")
 
95
            return self.parent.error("http", args[0], res, code, "", {})
 
96
        elif action == "raise":
 
97
            raise urllib2.URLError("blah")
 
98
        assert False
 
99
    def close(self): pass
 
100
    def add_parent(self, parent):
 
101
        self.parent = parent
 
102
        self.parent.calls = []
 
103
    def __cmp__(self, other):
 
104
        if hasattr(other, "handler_order"):
 
105
            return cmp(self.handler_order, other.handler_order)
 
106
        # No handler_order, leave in original order.  Yuck.
 
107
        return -1
 
108
        #return cmp(id(self), id(other))
 
109
 
 
110
 
 
111
def add_ordered_mock_handlers(opener, meth_spec):
 
112
    handlers = []
 
113
    count = 0
 
114
    for meths in meth_spec:
 
115
        class MockHandlerSubclass(MockHandler): pass
 
116
        h = MockHandlerSubclass(meths)
 
117
        h.handler_order = h.processor_order = count
 
118
        h.add_parent(opener)
 
119
        count = count + 1
 
120
        handlers.append(h)
 
121
        opener.add_handler(h)
 
122
    return handlers
 
123
 
 
124
class OpenerDirectorTests(unittest.TestCase):
 
125
 
 
126
    def test_handled(self):
 
127
        # handler returning non-None means no more handlers will be called
 
128
        o = OpenerDirector()
 
129
        meth_spec = [
 
130
            ["http_open", "ftp_open", "http_error_302"],
 
131
            ["ftp_open"],
 
132
            [("http_open", "return self")],
 
133
            [("http_open", "return self")],
 
134
            ]
 
135
        handlers = add_ordered_mock_handlers(o, meth_spec)
 
136
 
 
137
        req = Request("http://example.com/")
 
138
        r = o.open(req)
 
139
        # Second http_open gets called, third doesn't, since second returned
 
140
        # non-None.  Handlers without http_open never get any methods called
 
141
        # on them.
 
142
        # In fact, second mock handler returns self (instead of response),
 
143
        # which becomes the OpenerDirector's return value.
 
144
        self.assert_(r == handlers[2])
 
145
        calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
 
146
        for i in range(len(o.calls)):
 
147
            handler, name, args, kwds = o.calls[i]
 
148
            self.assert_((handler, name) == calls[i])
 
149
            self.assert_(args == (req,))
 
150
 
 
151
    def test_handler_order(self):
 
152
        from ClientCookie._urllib2_support import methnames
 
153
        o = OpenerDirector()
 
154
        handlers = []
 
155
        for meths, handler_order in [
 
156
            ([("http_open", "return self")], 500),
 
157
            (["http_open"], 0),
 
158
            ]:
 
159
            class MockHandlerSubclass(MockHandler): pass
 
160
            h = MockHandlerSubclass(meths)
 
161
            h.handler_order = handler_order
 
162
            handlers.append(h)
 
163
            o.add_handler(h)
 
164
 
 
165
        r = o.open("http://example.com/")
 
166
        # handlers called in reverse order, thanks to their sort order
 
167
        self.assert_(o.calls[0][0] == handlers[1])
 
168
        self.assert_(o.calls[1][0] == handlers[0])
 
169
 
 
170
    def test_raise(self):
 
171
        # raising URLError stops processing of request
 
172
        o = OpenerDirector()
 
173
        meth_spec = [
 
174
            [("http_open", "raise")],
 
175
            [("http_open", "return self")],
 
176
            ]
 
177
        handlers = add_ordered_mock_handlers(o, meth_spec)
 
178
 
 
179
        req = Request("http://example.com/")
 
180
        self.assertRaises(urllib2.URLError, o.open, req)
 
181
        self.assert_(o.calls == [(handlers[0], "http_open", (req,), {})])
 
182
 
 
183
##     def test_error(self):
 
184
##         # XXX this doesn't actually seem to be used in standard library,
 
185
##         #  but should really be tested anyway...
 
186
 
 
187
    def test_http_error(self):
 
188
        # XXX http_error_default
 
189
        # http errors are a special case
 
190
        o = OpenerDirector()
 
191
        meth_spec = [
 
192
            [("http_open", "error 302")],
 
193
            [("http_error_400", "raise"), "http_open"],
 
194
            [("http_error_302", "return response"), "http_error_303",
 
195
             "http_error"],
 
196
            [("http_error_302")],
 
197
            ]
 
198
        handlers = add_ordered_mock_handlers(o, meth_spec)
 
199
 
 
200
        class Unknown: pass
 
201
 
 
202
        req = Request("http://example.com/")
 
203
        r = o.open(req)
 
204
        assert len(o.calls) == 2
 
205
        calls = [(handlers[0], "http_open", (req,)),
 
206
                 (handlers[2], "http_error_302", (req, Unknown, 302, "", {}))]
 
207
        for i in range(len(calls)):
 
208
            handler, method_name, args, kwds = o.calls[i]
 
209
            self.assert_((handler, method_name) == calls[i][:2])
 
210
            # check handler methods were called with expected arguments
 
211
            expected_args = calls[i][2]
 
212
            for j in range(len(args)):
 
213
                if expected_args[j] is not Unknown:
 
214
                    self.assert_(args[j] == expected_args[j])
 
215
 
 
216
    def test_processors(self):
 
217
        # *_request / *_response methods get called appropriately
 
218
        o = OpenerDirector()
 
219
        meth_spec = [
 
220
            [("http_request", "return request"),
 
221
             ("http_response", "return response")],
 
222
            [("http_request", "return request"),
 
223
             ("http_response", "return response")],
 
224
            ]
 
225
        handlers = add_ordered_mock_handlers(o, meth_spec)
 
226
 
 
227
        req = Request("http://example.com/")
 
228
        r = o.open(req)
 
229
        # processor methods are called on *all* handlers that define them,
 
230
        # not just the first handler
 
231
        calls = [(handlers[0], "http_request"), (handlers[1], "http_request"),
 
232
                 (handlers[0], "http_response"), (handlers[1], "http_response")]
 
233
 
 
234
        for i in range(len(o.calls)):
 
235
            handler, name, args, kwds = o.calls[i]
 
236
            if i < 2:
 
237
                # *_request
 
238
                self.assert_((handler, name) == calls[i])
 
239
                self.assert_(len(args) == 1)
 
240
                self.assert_(isinstance(args[0], Request))
 
241
            else:
 
242
                # *_response
 
243
                self.assert_((handler, name) == calls[i])
 
244
                self.assert_(len(args) == 2)
 
245
                self.assert_(isinstance(args[0], Request))
 
246
                # response from opener.open is None, because there's no
 
247
                # handler that defines http_open to handle it
 
248
                self.assert_(args[1] is None or
 
249
                             isinstance(args[1], MockResponse))
 
250
 
 
251
 
 
252
class MockHTTPClass:
 
253
    def __init__(self):
 
254
        self.req_headers = []
 
255
        self.data = None
 
256
        self.raise_on_endheaders = False
 
257
    def __call__(self, host):
 
258
        self.host = host
 
259
        return self
 
260
    def set_debuglevel(self, level): self.level = level
 
261
    def putrequest(self, method, selector):
 
262
        self.method, self.selector = method, selector
 
263
    def putheader(self, key, value):
 
264
        self.req_headers.append((key, value))
 
265
    def endheaders(self):
 
266
        if self.raise_on_endheaders:
 
267
            import socket
 
268
            raise socket.error()
 
269
    def send(self, data): self.data = data
 
270
    def getreply(self): return 200, "OK", {}
 
271
    def getfile(self): return MockFile()
 
272
 
 
273
 
 
274
class MockFTPWrapper:
 
275
    def __init__(self, data): self.data = data
 
276
    def retrfile(self, filename, filetype):
 
277
        self.filename, self.filetype = filename, filetype
 
278
        return StringIO.StringIO(self.data), len(self.data)
 
279
 
 
280
class NullFTPHandler(urllib2.FTPHandler):
 
281
    def __init__(self, data): self.data = data
 
282
    def connect_ftp(self, user, passwd, host, port, dirs):
 
283
        self.user, self.passwd = user, passwd
 
284
        self.host, self.port = host, port
 
285
        self.dirs = dirs
 
286
        self.ftpwrapper = MockFTPWrapper(self.data)
 
287
        return self.ftpwrapper
 
288
 
 
289
def sanepathname2url(path):
 
290
    import urllib
 
291
    urlpath = urllib.pathname2url(path)
 
292
    if os.name == "nt" and urlpath.startswith("///"):
 
293
        urlpath = urlpath[2:]
 
294
    # XXX don't ask me about the mac...
 
295
    return urlpath
 
296
 
 
297
class MockRobotFileParserClass:
 
298
    def __init__(self):
 
299
        self.calls = []
 
300
        self._can_fetch = True
 
301
    def clear(self):
 
302
        self.calls = []
 
303
    def __call__(self):
 
304
        self.calls.append("__call__")
 
305
        return self
 
306
    def set_url(self, url):
 
307
        self.calls.append(("set_url", url))
 
308
    def read(self):
 
309
        self.calls.append("read")
 
310
    def can_fetch(self, ua, url):
 
311
        self.calls.append(("can_fetch", ua, url))
 
312
        return self._can_fetch
 
313
 
 
314
class HandlerTests(unittest.TestCase):
 
315
 
 
316
    if hasattr(sys, "version_info") and sys.version_info > (2, 1, 3, "final", 0):
 
317
 
 
318
        def test_ftp(self):
 
319
            import ftplib, socket
 
320
            data = "rheum rhaponicum"
 
321
            h = NullFTPHandler(data)
 
322
            o = h.parent = MockOpener()
 
323
 
 
324
            for url, host, port, type_, dirs, filename, mimetype in [
 
325
                ("ftp://localhost/foo/bar/baz.html",
 
326
                 "localhost", ftplib.FTP_PORT, "I",
 
327
                 ["foo", "bar"], "baz.html", "text/html"),
 
328
                # XXXX Bug: FTPHandler tries to gethostbyname "localhost:80",
 
329
                #  with the port still there.
 
330
                #("ftp://localhost:80/foo/bar/",
 
331
                # "localhost", 80, "D",
 
332
                # ["foo", "bar"], "", None),
 
333
                # XXXX bug: second use of splitattr() in FTPHandler should be
 
334
                #  splitvalue()
 
335
                #("ftp://localhost/baz.gif;type=a",
 
336
                # "localhost", ftplib.FTP_PORT, "A",
 
337
                # [], "baz.gif", "image/gif"),
 
338
                ]:
 
339
                r = h.ftp_open(Request(url))
 
340
                # ftp authentication not yet implemented by FTPHandler
 
341
                self.assert_(h.user == h.passwd == "")
 
342
                self.assert_(h.host == socket.gethostbyname(host))
 
343
                self.assert_(h.port == port)
 
344
                self.assert_(h.dirs == dirs)
 
345
                self.assert_(h.ftpwrapper.filename == filename)
 
346
                self.assert_(h.ftpwrapper.filetype == type_)
 
347
                headers = r.info()
 
348
                self.assert_(headers["Content-type"] == mimetype)
 
349
                self.assert_(int(headers["Content-length"]) == len(data))
 
350
 
 
351
        def test_file(self):
 
352
            import time, rfc822, socket
 
353
            h = urllib2.FileHandler()
 
354
            o = h.parent = MockOpener()
 
355
 
 
356
            #TESTFN = test_support.TESTFN
 
357
            TESTFN = "test.txt"
 
358
            urlpath = sanepathname2url(os.path.abspath(TESTFN))
 
359
            towrite = "hello, world\n"
 
360
            for url in [
 
361
                "file://localhost%s" % urlpath,
 
362
                "file://%s" % urlpath,
 
363
                "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
 
364
                "file://%s%s" % (socket.gethostbyname(socket.gethostname()),
 
365
                                 urlpath),
 
366
                ]:
 
367
                f = open(TESTFN, "wb")
 
368
                try:
 
369
                    try:
 
370
                        f.write(towrite)
 
371
                    finally:
 
372
                        f.close()
 
373
 
 
374
                    r = h.file_open(Request(url))
 
375
                    try:
 
376
                        data = r.read()
 
377
                        headers = r.info()
 
378
                        newurl = r.geturl()
 
379
                    finally:
 
380
                        r.close()
 
381
                    stats = os.stat(TESTFN)
 
382
                    modified = rfc822.formatdate(stats.st_mtime)
 
383
                finally:
 
384
                    os.remove(TESTFN)
 
385
                self.assertEqual(data, towrite)
 
386
                self.assertEqual(headers["Content-type"], "text/plain")
 
387
                self.assertEqual(headers["Content-length"], "13")
 
388
                self.assertEqual(headers["Last-modified"], modified)
 
389
 
 
390
            for url in [
 
391
                "file://localhost:80%s" % urlpath,
 
392
    # XXXX bug: these fail with socket.gaierror, should be URLError
 
393
    ##             "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
 
394
    ##                                    os.getcwd(), TESTFN),
 
395
    ##             "file://somerandomhost.ontheinternet.com%s/%s" %
 
396
    ##             (os.getcwd(), TESTFN),
 
397
                ]:
 
398
                try:
 
399
                    f = open(TESTFN, "wb")
 
400
                    try:
 
401
                        f.write(towrite)
 
402
                    finally:
 
403
                        f.close()
 
404
 
 
405
                    self.assertRaises(urllib2.URLError,
 
406
                                      h.file_open, Request(url))
 
407
                finally:
 
408
                    os.remove(TESTFN)
 
409
 
 
410
            h = urllib2.FileHandler()
 
411
            o = h.parent = MockOpener()
 
412
            # XXXX why does // mean ftp (and /// mean not ftp!), and where
 
413
            #  is file: scheme specified?  I think this is really a bug, and
 
414
            #  what was intended was to distinguish between URLs like:
 
415
            # file:/blah.txt (a file)
 
416
            # file://localhost/blah.txt (a file)
 
417
            # file:///blah.txt (a file)
 
418
            # file://ftp.example.com/blah.txt (an ftp URL)
 
419
            for url, ftp in [
 
420
                ("file://ftp.example.com//foo.txt", True),
 
421
                ("file://ftp.example.com///foo.txt", False),
 
422
    # XXXX bug: fails with OSError, should be URLError
 
423
                ("file://ftp.example.com/foo.txt", False),
 
424
                ]:
 
425
                req = Request(url)
 
426
                try:
 
427
                    h.file_open(req)
 
428
                # XXXX remove OSError when bug fixed
 
429
                except (urllib2.URLError, OSError):
 
430
                    self.assert_(not ftp)
 
431
                else:
 
432
                    self.assert_(o.req is req)
 
433
                    self.assertEqual(req.type, "ftp")
 
434
 
 
435
##         def test_file(self):
 
436
##             import time, rfc822, socket
 
437
##             h = urllib2.FileHandler()
 
438
##             o = h.parent = MockOpener()
 
439
 
 
440
##             #from test_support import TESTFN
 
441
##             TESTFN = "test.txt"
 
442
##             towrite = "hello, world\n"
 
443
##             for url in [
 
444
##                 "file://localhost%s/%s" % (os.getcwd(), TESTFN),
 
445
##                 "file://%s/%s" % (os.getcwd(), TESTFN),
 
446
##                 "file://%s%s/%s" % (socket.gethostbyname('localhost'),
 
447
##                                     os.getcwd(), TESTFN),
 
448
##                 "file://%s%s/%s" % (socket.gethostbyname(socket.gethostname()),
 
449
##                                     os.getcwd(), TESTFN),
 
450
##                 # XXX Windows / Mac format(s), ... ?
 
451
##                 ]:
 
452
##                 create_time = time.time()
 
453
##                 f = open(TESTFN, "w")
 
454
##                 try:
 
455
##                     try:
 
456
##                         f.write(towrite)
 
457
##                     finally:
 
458
##                         f.close()
 
459
 
 
460
##                     r = h.file_open(Request(url))
 
461
##                     try:
 
462
##                         data = r.read()
 
463
##                         headers = r.info()
 
464
##                         newurl = r.geturl()
 
465
##                     finally:
 
466
##                         r.close()
 
467
##                     stats = os.stat(TESTFN)
 
468
##                     modified = rfc822.formatdate(stats.st_mtime)
 
469
##                 finally:
 
470
##                     os.remove(TESTFN)
 
471
##                 self.assert_(data == towrite)
 
472
##                 self.assert_(headers["Content-type"] == "text/plain")
 
473
##                 self.assert_(headers["Content-length"] == "13")
 
474
## ##                 # Fudge Last-modified string comparison by one second to
 
475
## ##                 # prevent spurious failure on crossing a second boundary while
 
476
## ##                 # executing this test.
 
477
## ##                 # XXXX This test still fails occasionally!  Why?
 
478
## ##                 unfudged = rfc822.formatdate(create_time)
 
479
## ##                 fudged = rfc822.formatdate(create_time+1)
 
480
## ##                 self.assert_(headers["Last-modified"] in [unfudged, fudged])
 
481
##                 self.assert_(headers["Last-modified"] == modified)
 
482
 
 
483
##             for url in [
 
484
##                 "file://localhost:80%s/%s" % (os.getcwd(), TESTFN),
 
485
##                 # XXXX bug: these fail with socket.gaierror, should be URLError
 
486
##                 #"file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
 
487
##                 #                       os.getcwd(), TESTFN),
 
488
##                 #"file://somerandomhost.ontheinternet.com%s/%s" %
 
489
##                 #(os.getcwd(), TESTFN),
 
490
##                 ]:
 
491
##                 try:
 
492
##                     f = open(TESTFN, "w")
 
493
##                     try:
 
494
##                         f.write(towrite)
 
495
##                     finally:
 
496
##                         f.close()
 
497
 
 
498
##                     self.assertRaises(urllib2.URLError,
 
499
##                                       h.file_open, Request(url))
 
500
##                 finally:
 
501
##                     os.remove(TESTFN)
 
502
 
 
503
##             h = urllib2.FileHandler()
 
504
##             o = h.parent = MockOpener()
 
505
##             # XXXX why does // mean ftp (and /// mean not ftp!), and where
 
506
##             #  is file: scheme specified?  I think this is really a bug, and
 
507
##             #  what was intended was to distinguish between URLs like:
 
508
##             # file:/blah.txt (a file)
 
509
##             # file://localhost/blah.txt (a file)
 
510
##             # file:///blah.txt (a file)
 
511
##             # file://ftp.example.com/blah.txt (an ftp URL)
 
512
##             for url, ftp in [
 
513
##                 ("file://ftp.example.com//foo.txt", True),
 
514
##                 ("file://ftp.example.com///foo.txt", False),
 
515
##                 # XXXX bug: fails with OSError, should be URLError
 
516
##                 ("file://ftp.example.com/foo.txt", False),
 
517
##                 ]:
 
518
##                 req = Request(url)
 
519
##                 try:
 
520
##                     h.file_open(req)
 
521
##                 except (urllib2.URLError, OSError):  # XXXX remove OSError
 
522
##                     self.assert_(not ftp)
 
523
##                 else:
 
524
##                     self.assert_(o.req is req)
 
525
##                     self.assert_(req.type == "ftp")
 
526
 
 
527
    def test_http(self):
 
528
        h = AbstractHTTPHandler()
 
529
        o = h.parent = MockOpener()
 
530
 
 
531
        url = "http://example.com/"
 
532
        for method, data in [("GET", None), ("POST", "blah")]:
 
533
            req = Request(url, data, {"Foo": "bar"})
 
534
            req.add_unredirected_header("Spam", "eggs")
 
535
            http = MockHTTPClass()
 
536
            r = h.do_open(http, req)
 
537
 
 
538
            # result attributes
 
539
            r.read; r.readline  # wrapped MockFile methods
 
540
            r.info; r.geturl  # addinfourl methods
 
541
            r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
 
542
            hdrs = r.info()
 
543
            hdrs.get; hdrs.has_key  # r.info() gives dict from .getreply()
 
544
            self.assert_(r.geturl() == url)
 
545
 
 
546
            self.assert_(http.host == "example.com")
 
547
            self.assert_(http.level == 0)
 
548
            self.assert_(http.method == method)
 
549
            self.assert_(http.selector == "/")
 
550
            http.req_headers.sort()
 
551
            self.assert_(http.req_headers == [("Foo", "bar"), ("Spam", "eggs")])
 
552
            self.assert_(http.data == data)
 
553
 
 
554
        # check socket.error converted to URLError
 
555
        http.raise_on_endheaders = True
 
556
        self.assertRaises(urllib2.URLError, h.do_open, http, req)
 
557
 
 
558
        # check adding of standard headers
 
559
        o.addheaders = [("Spam", "eggs")]
 
560
        for data in "", None:  # POST, GET
 
561
            req = Request("http://example.com/", data)
 
562
            r = MockResponse(200, "OK", {}, "")
 
563
            newreq = h.do_request_(req)
 
564
            if data is None:  # GET
 
565
                self.assert_(not req.unredirected_hdrs.has_key("Content-length"))
 
566
                self.assert_(not req.unredirected_hdrs.has_key("Content-type"))
 
567
            else:  # POST
 
568
                self.assert_(req.unredirected_hdrs["Content-length"] == "0")
 
569
                self.assert_(req.unredirected_hdrs["Content-type"] ==
 
570
                             "application/x-www-form-urlencoded")
 
571
            # XXX the details of Host could be better tested
 
572
            self.assert_(req.unredirected_hdrs["Host"] == "example.com")
 
573
            self.assert_(req.unredirected_hdrs["Spam"] == "eggs")
 
574
 
 
575
            # don't clobber existing headers
 
576
            req.add_unredirected_header("Content-length", "foo")
 
577
            req.add_unredirected_header("Content-type", "bar")
 
578
            req.add_unredirected_header("Host", "baz")
 
579
            req.add_unredirected_header("Spam", "foo")
 
580
            newreq = h.do_request_(req)
 
581
            self.assert_(req.unredirected_hdrs["Content-length"] == "foo")
 
582
            self.assert_(req.unredirected_hdrs["Content-type"] == "bar")
 
583
            self.assert_(req.unredirected_hdrs["Host"] == "baz")
 
584
            self.assert_(req.unredirected_hdrs["Spam"] == "foo")
 
585
 
 
586
    def test_request_upgrade(self):
 
587
        h = HTTPRequestUpgradeProcessor()
 
588
        o = h.parent = MockOpener()
 
589
 
 
590
        # urllib2.Request gets upgraded
 
591
        req = urllib2.Request("http://example.com/")
 
592
        newreq = h.http_request(req)
 
593
        self.assert_(newreq is not req)
 
594
        self.assert_(newreq.__class__ is Request)
 
595
        # ClientCookie._urllib2_support.Request doesn't
 
596
        req = Request("http://example.com/")
 
597
        newreq = h.http_request(req)
 
598
        self.assert_(newreq is req)
 
599
        self.assert_(newreq.__class__ is Request)
 
600
 
 
601
    def test_referer(self):
 
602
        h = HTTPRefererProcessor()
 
603
        o = h.parent = MockOpener()
 
604
 
 
605
        # normal case
 
606
        url = "http://example.com/"
 
607
        req = Request(url)
 
608
        r = MockResponse(200, "OK", {}, "", url)
 
609
        newr = h.http_response(req, r)
 
610
        self.assert_(r is newr)
 
611
        self.assert_(h.referer == url)
 
612
        newreq = h.http_request(req)
 
613
        self.assert_(req is newreq)
 
614
        self.assert_(req.unredirected_hdrs["Referer"] == url)
 
615
        # don't clobber existing Referer
 
616
        ref = "http://set.by.user.com/"
 
617
        req.add_unredirected_header("Referer", ref)
 
618
        newreq = h.http_request(req)
 
619
        self.assert_(req is newreq)
 
620
        self.assert_(req.unredirected_hdrs["Referer"] == ref)
 
621
 
 
622
    def test_errors(self):
 
623
        h = HTTPErrorProcessor()
 
624
        o = h.parent = MockOpener()
 
625
 
 
626
        url = "http://example.com/"
 
627
        req = Request(url)
 
628
        # 200 OK is passed through
 
629
        r = MockResponse(200, "OK", {}, "", url)
 
630
        newr = h.http_response(req, r)
 
631
        self.assert_(r is newr)
 
632
        self.assert_(not hasattr(o, "proto"))  # o.error not called
 
633
        # anything else calls o.error (and MockOpener returns None, here)
 
634
        r = MockResponse(201, "Created", {}, "", url)
 
635
        self.assert_(h.http_response(req, r) is None)
 
636
        self.assert_(o.proto == "http")  # o.error called
 
637
        self.assert_(o.args == (req, r, 201, "Created", {}))
 
638
 
 
639
    def test_robots(self):
 
640
        # XXX useragent
 
641
        rfpc = MockRobotFileParserClass()
 
642
        h = HTTPRobotRulesProcessor(rfpc)
 
643
 
 
644
        url = "http://example.com:80/foo/bar.html"
 
645
        req = Request(url)
 
646
        # first time: initialise and set up robots.txt parser before checking
 
647
        #  whether OK to fetch URL
 
648
        h.http_request(req)
 
649
        self.assert_(rfpc.calls == [
 
650
            "__call__",
 
651
            ("set_url", "http://example.com:80/robots.txt"),
 
652
            "read",
 
653
            ("can_fetch", "", url),
 
654
            ])
 
655
        # second time: just use existing parser
 
656
        rfpc.clear()
 
657
        req = Request(url)
 
658
        h.http_request(req)
 
659
        self.assert_(rfpc.calls == [
 
660
            ("can_fetch", "", url),
 
661
            ])
 
662
        # different URL on same server: same again
 
663
        rfpc.clear()
 
664
        url = "http://example.com:80/blah.html"
 
665
        req = Request(url)
 
666
        h.http_request(req)
 
667
        self.assert_(rfpc.calls == [
 
668
            ("can_fetch", "", url),
 
669
            ])
 
670
        # disallowed URL
 
671
        rfpc.clear()
 
672
        rfpc._can_fetch = False
 
673
        url = "http://example.com:80/rhubarb.html"
 
674
        req = Request(url)
 
675
        try:
 
676
            h.http_request(req)
 
677
        except urllib2.HTTPError, e:
 
678
            self.assert_(e.request == req)
 
679
            self.assert_(e.code == 403)
 
680
        # new host: reload robots.txt (even though the host and port are
 
681
        #  unchanged, we treat this as a new host because
 
682
        #  "example.com" != "example.com:80")
 
683
        rfpc.clear()
 
684
        rfpc._can_fetch = True
 
685
        url = "http://example.com/rhubarb.html"
 
686
        req = Request(url)
 
687
        h.http_request(req)
 
688
        self.assert_(rfpc.calls == [
 
689
            "__call__",
 
690
            ("set_url", "http://example.com/robots.txt"),
 
691
            "read",
 
692
            ("can_fetch", "", url),
 
693
            ])
 
694
 
 
695
    def test_cookies(self):
 
696
        cj = MockCookieJar()
 
697
        h = HTTPCookieProcessor(cj)
 
698
        o = h.parent = MockOpener()
 
699
 
 
700
        req = urllib2.Request("http://example.com/")
 
701
        r = MockResponse(200, "OK", {}, "")
 
702
        newreq = h.http_request(req)
 
703
        self.assert_(cj.ach_req is req is newreq)
 
704
        self.assert_(req.origin_req_host == "example.com")
 
705
        self.assert_(cj.ach_u == False)
 
706
        newr = h.http_response(req, r)
 
707
        self.assert_(cj.ec_req is req)
 
708
        self.assert_(cj.ec_r is r is newr)
 
709
        self.assert_(cj.ec_u == False)
 
710
 
 
711
    def test_seekable(self):
 
712
        h = SeekableProcessor()
 
713
        o = h.parent = MockOpener()
 
714
 
 
715
        req = urllib2.Request("http://example.com/")
 
716
        class MockUnseekableResponse: pass
 
717
        r = MockUnseekableResponse()
 
718
        newr = h.http_response(req, r)
 
719
        self.assert_(not hasattr(r, "seek"))
 
720
        self.assert_(hasattr(newr, "seek"))
 
721
 
 
722
    def test_http_equiv(self):
 
723
        h = HTTPEquivProcessor()
 
724
        o = h.parent = MockOpener()
 
725
 
 
726
        req = Request("http://example.com/")
 
727
        r = MockResponse(200, "OK", {"Foo": "Bar"},
 
728
                         '<html><head>'
 
729
                         '<meta http-equiv="Refresh" content="spam">'
 
730
                         '</head></html>')
 
731
        newr = h.http_response(req, r)
 
732
        headers = newr.info()
 
733
        self.assert_(headers["Refresh"] == "spam")
 
734
        self.assert_(headers["Foo"] == "Bar")
 
735
 
 
736
    def test_refresh(self):
 
737
        # XXX processor constructor optional args
 
738
        h = HTTPRefreshProcessor()
 
739
        o = h.parent = MockOpener()
 
740
 
 
741
        req = Request("http://example.com/")
 
742
        headers = MockHeaders({"refresh": '0; url="http://example.com/foo/"'})
 
743
        r = MockResponse(200, "OK", headers, "")
 
744
        newr = h.http_response(req, r)
 
745
        self.assert_(o.proto == "http")
 
746
        self.assert_(o.args == (req, r, "refresh", "OK", headers))
 
747
 
 
748
    def test_redirect(self):
 
749
        from_url = "http://example.com/a.html"
 
750
        to_url = "http://example.com/b.html"
 
751
        h = HTTPRedirectHandler()
 
752
        o = h.parent = MockOpener()
 
753
 
 
754
        # ordinary redirect behaviour
 
755
        for code in 301, 302, 303, 307, "refresh":
 
756
            for data in None, "blah\nblah\n":
 
757
                method = getattr(h, "http_error_%s" % code)
 
758
                req = Request(from_url, data)
 
759
                req.add_header("Nonsense", "viking=withhold")
 
760
                req.add_unredirected_header("Spam", "spam")
 
761
                req.origin_req_host = "example.com"  # XXX
 
762
                try:
 
763
                    method(req, MockFile(), code, "Blah",
 
764
                           MockHeaders({"location": to_url}))
 
765
                except urllib2.HTTPError:
 
766
                    # 307 in response to POST requires user OK
 
767
                    self.assert_(code == 307 and data is not None)
 
768
                self.assert_(o.req.get_full_url() == to_url)
 
769
                try:
 
770
                    self.assert_(o.req.get_method() == "GET")
 
771
                except AttributeError:
 
772
                    self.assert_(not o.req.has_data())
 
773
                self.assert_(o.req.headers["Nonsense"] == "viking=withhold")
 
774
                self.assert_(not o.req.headers.has_key("Spam"))
 
775
                self.assert_(not o.req.unredirected_hdrs.has_key("Spam"))
 
776
 
 
777
        # loop detection
 
778
        def redirect(h, req, url=to_url):
 
779
            h.http_error_302(req, MockFile(), 302, "Blah",
 
780
                             MockHeaders({"location": url}))
 
781
        # Note that the *original* request shares the same record of
 
782
        # redirections with the sub-requests caused by the redirections.
 
783
 
 
784
        # detect infinite loop redirect of a URL to itself
 
785
        req = Request(from_url)
 
786
        req.origin_req_host = "example.com"
 
787
        count = 0
 
788
        try:
 
789
            while 1:
 
790
                redirect(h, req, "http://example.com/")
 
791
                count = count + 1
 
792
        except urllib2.HTTPError:
 
793
            # don't stop until max_repeats, because cookies may introduce state
 
794
            self.assert_(count == HTTPRedirectHandler.max_repeats)
 
795
 
 
796
        # detect endless non-repeating chain of redirects
 
797
        req = Request(from_url)
 
798
        req.origin_req_host = "example.com"
 
799
        count = 0
 
800
        try:
 
801
            while 1:
 
802
                redirect(h, req, "http://example.com/%d" % count)
 
803
                count = count + 1
 
804
        except urllib2.HTTPError:
 
805
            self.assert_(count == HTTPRedirectHandler.max_redirections)
 
806
 
 
807
 
 
808
class HeadParserTests(unittest.TestCase):
 
809
 
 
810
    def test(self):
 
811
        # XXX XHTML
 
812
        htmls = [
 
813
            ("""<meta http-equiv="refresh" content="1; http://example.com/">
 
814
            """,
 
815
            [("refresh", "1; http://example.com/")]
 
816
            ),
 
817
            ("""
 
818
            <html><head>
 
819
            <meta http-equiv="refresh" content="1; http://example.com/">
 
820
            <meta name="spam" content="eggs">
 
821
            <meta http-equiv="foo" content="bar">
 
822
            <p> <!-- p is not allowed in head, so parsing should stop here-->
 
823
            <meta http-equiv="moo" content="cow">
 
824
            </html>
 
825
            """,
 
826
             [("refresh", "1; http://example.com/"), ("foo", "bar")])
 
827
            ]
 
828
        for html, result in htmls:
 
829
            self.assert_(parse_head(StringIO.StringIO(html)) == result)
 
830
 
 
831
 
 
832
class MyHTTPHandler(HTTPHandler): pass
 
833
class FooHandler(urllib2.BaseHandler):
 
834
    def foo_open(self): pass
 
835
class BarHandler(urllib2.BaseHandler):
 
836
    def bar_open(self): pass
 
837
 
 
838
class A:
 
839
    def a(self): pass
 
840
class B(A):
 
841
    def a(self): pass
 
842
    def b(self): pass
 
843
class C(A):
 
844
    def c(self): pass
 
845
class D(C, B):
 
846
    def a(self): pass
 
847
    def d(self): pass
 
848
 
 
849
class FunctionTests(unittest.TestCase):
 
850
 
 
851
    def test_build_opener(self):
 
852
        o = build_opener(FooHandler, BarHandler)
 
853
        self.opener_has_handler(o, FooHandler)
 
854
        self.opener_has_handler(o, BarHandler)
 
855
 
 
856
        # can take a mix of classes and instances
 
857
        o = build_opener(FooHandler, BarHandler())
 
858
        self.opener_has_handler(o, FooHandler)
 
859
        self.opener_has_handler(o, BarHandler)
 
860
 
 
861
        # subclasses of default handlers override default handlers
 
862
        o = build_opener(MyHTTPHandler)
 
863
        self.opener_has_handler(o, MyHTTPHandler)
 
864
 
 
865
        # a particular case of overriding: default handlers can be passed
 
866
        # in explicitly
 
867
        o = build_opener()
 
868
        self.opener_has_handler(o, HTTPHandler)
 
869
        o = build_opener(HTTPHandler)
 
870
        self.opener_has_handler(o, HTTPHandler)
 
871
        o = build_opener(HTTPHandler())
 
872
        self.opener_has_handler(o, HTTPHandler)
 
873
 
 
874
    def opener_has_handler(self, opener, handler_class):
 
875
        for h in opener.handlers:
 
876
            if h.__class__ == handler_class:
 
877
                break
 
878
        else:
 
879
            self.assert_(False)
 
880
 
 
881
    def _methnames(self, *objs):
 
882
        from ClientCookie._urllib2_support import methnames
 
883
        r = []
 
884
        for i in range(len(objs)):
 
885
            obj = objs[i]
 
886
            names = methnames(obj)
 
887
            names.sort()
 
888
            # special methods vary over Python versions
 
889
            names = filter(lambda mn: mn[0:2] != "__" , names)
 
890
            r.append(names)
 
891
        return r
 
892
 
 
893
    def test_methnames(self):
 
894
        a, b, c, d = A(), B(), C(), D()
 
895
        a, b, c, d = self._methnames(a, b, c, d)
 
896
        self.assert_(a == ["a"])
 
897
        self.assert_(b == ["a", "b"])
 
898
        self.assert_(c == ["a", "c"])
 
899
        self.assert_(d == ["a", "b", "c", "d"])
 
900
 
 
901
        a, b, c, d = A(), B(), C(), D()
 
902
        a.x = lambda self: None
 
903
        b.y = lambda self: None
 
904
        d.z = lambda self: None
 
905
        a, b, c, d = self._methnames(a, b, c, d)
 
906
        self.assert_(a == ["a", "x"])
 
907
        self.assert_(b == ["a", "b", "y"])
 
908
        self.assert_(c == ["a", "c"])
 
909
        self.assert_(d == ["a", "b", "c", "d", "z"])
 
910
 
 
911
 
 
912
if __name__ == "__main__":
 
913
    unittest.main()