~ubuntu-branches/ubuntu/lucid/python-httplib2/lucid-proposed

« back to all changes in this revision

Viewing changes to python2/httplib2test.py

  • Committer: Bazaar Package Importer
  • Author(s): Luca Falavigna
  • Date: 2010-01-03 17:12:21 UTC
  • mfrom: (1.1.3 upstream) (2.1.2 sid)
  • Revision ID: james.westby@ubuntu.com-20100103171221-qyc6dmr91jh0wzr1
* New upstream release.
* Add new copyright holders.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python2.4
 
2
"""
 
3
httplib2test
 
4
 
 
5
A set of unit tests for httplib2.py.
 
6
 
 
7
Requires Python 2.4 or later
 
8
"""
 
9
 
 
10
__author__ = "Joe Gregorio (joe@bitworking.org)"
 
11
__copyright__ = "Copyright 2006, Joe Gregorio"
 
12
__contributors__ = []
 
13
__license__ = "MIT"
 
14
__history__ = """ """
 
15
__version__ = "0.1 ($Rev: 118 $)"
 
16
 
 
17
 
 
18
import sys
 
19
import unittest
 
20
import httplib
 
21
import httplib2
 
22
import os
 
23
import urlparse
 
24
import time
 
25
import base64
 
26
import StringIO
 
27
 
 
28
# Python 2.3 support
 
29
if not hasattr(unittest.TestCase, 'assertTrue'):
 
30
    unittest.TestCase.assertTrue = unittest.TestCase.failUnless
 
31
    unittest.TestCase.assertFalse = unittest.TestCase.failIf
 
32
 
 
33
# The test resources base uri
 
34
base = 'http://bitworking.org/projects/httplib2/test/'
 
35
#base = 'http://localhost/projects/httplib2/test/'
 
36
cacheDirName = ".cache"
 
37
 
 
38
 
 
39
class CredentialsTest(unittest.TestCase):
 
40
    def test(self):
 
41
        c = httplib2.Credentials()
 
42
        c.add("joe", "password")
 
43
        self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
 
44
        self.assertEqual(("joe", "password"), list(c.iter(""))[0])
 
45
        c.add("fred", "password2", "wellformedweb.org")
 
46
        self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
 
47
        self.assertEqual(1, len(list(c.iter("bitworking.org"))))
 
48
        self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
 
49
        self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
 
50
        c.clear()
 
51
        self.assertEqual(0, len(list(c.iter("bitworking.org"))))
 
52
        c.add("fred", "password2", "wellformedweb.org")
 
53
        self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
 
54
        self.assertEqual(0, len(list(c.iter("bitworking.org"))))
 
55
        self.assertEqual(0, len(list(c.iter(""))))
 
56
 
 
57
 
 
58
class ParserTest(unittest.TestCase):
 
59
    def testFromStd66(self):
 
60
        self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
 
61
        self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
 
62
        self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
 
63
        self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
 
64
        self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
 
65
        self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
 
66
        self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
 
67
        self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
 
68
 
 
69
 
 
70
class UrlNormTest(unittest.TestCase):
 
71
    def test(self):
 
72
        self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
 
73
        self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
 
74
        self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
 
75
        self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
 
76
        self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
 
77
        self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
 
78
        try:
 
79
            httplib2.urlnorm("/")
 
80
            self.fail("Non-absolute URIs should raise an exception")
 
81
        except httplib2.RelativeURIError:
 
82
            pass
 
83
 
 
84
class UrlSafenameTest(unittest.TestCase):
 
85
    def test(self):
 
86
        # Test that different URIs end up generating different safe names
 
87
        self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
 
88
        self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
 
89
        self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
 
90
        self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
 
91
        self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
 
92
        self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
 
93
        # Test the max length limits
 
94
        uri = "http://" + ("w" * 200) + ".org"
 
95
        uri2 = "http://" + ("w" * 201) + ".org"
 
96
        self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
 
97
        # Max length should be 200 + 1 (",") + 32
 
98
        self.assertEqual(233, len(httplib2.safename(uri2)))
 
99
        self.assertEqual(233, len(httplib2.safename(uri)))
 
100
        # Unicode
 
101
        if sys.version_info >= (2,3):
 
102
            self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename(u"http://\u2304.org/fred/?a=b"))
 
103
 
 
104
class _MyResponse(StringIO.StringIO):
 
105
    def __init__(self, body, **kwargs):
 
106
        StringIO.StringIO.__init__(self, body)
 
107
        self.headers = kwargs
 
108
 
 
109
    def iteritems(self):
 
110
        return self.headers.iteritems()
 
111
 
 
112
 
 
113
class _MyHTTPConnection(object):
 
114
    "This class is just a mock of httplib.HTTPConnection used for testing"
 
115
 
 
116
    def __init__(self, host, port=None, key_file=None, cert_file=None,
 
117
                 strict=None, timeout=None, proxy_info=None):
 
118
        self.host = host
 
119
        self.port = port
 
120
        self.timeout = timeout
 
121
        self.log = ""
 
122
 
 
123
    def set_debuglevel(self, level):
 
124
        pass
 
125
 
 
126
    def connect(self):
 
127
        "Connect to a host on a given port."
 
128
        pass
 
129
 
 
130
    def close(self):
 
131
        pass
 
132
 
 
133
    def request(self, method, request_uri, body, headers):
 
134
        pass
 
135
 
 
136
    def getresponse(self):
 
137
        return _MyResponse("the body", status="200")
 
138
 
 
139
 
 
140
class HttpTest(unittest.TestCase):
 
141
    def setUp(self):
 
142
        if os.path.exists(cacheDirName): 
 
143
            [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
 
144
        self.http = httplib2.Http(cacheDirName)
 
145
        self.http.clear_credentials()
 
146
 
 
147
    def testConnectionType(self):
 
148
        self.http.force_exception_to_status_code = False 
 
149
        response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
 
150
        self.assertEqual(response['content-location'], "http://bitworking.org")
 
151
        self.assertEqual(content, "the body")
 
152
 
 
153
    def testGetUnknownServer(self):
 
154
        self.http.force_exception_to_status_code = False 
 
155
        try:
 
156
            self.http.request("http://fred.bitworking.org/")
 
157
            self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
 
158
        except httplib2.ServerNotFoundError:
 
159
            pass
 
160
 
 
161
        # Now test with exceptions turned off
 
162
        self.http.force_exception_to_status_code = True
 
163
 
 
164
        (response, content) = self.http.request("http://fred.bitworking.org/")
 
165
        self.assertEqual(response['content-type'], 'text/plain')
 
166
        self.assertTrue(content.startswith("Unable to find"))
 
167
        self.assertEqual(response.status, 400)
 
168
 
 
169
    def testGetIRI(self):
 
170
        if sys.version_info >= (2,3):
 
171
            uri = urlparse.urljoin(base, u"reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
 
172
            (response, content) = self.http.request(uri, "GET")
 
173
            d = self.reflector(content)
 
174
            self.assertTrue(d.has_key('QUERY_STRING')) 
 
175
            self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0) 
 
176
    
 
177
    def testGetIsDefaultMethod(self):
 
178
        # Test that GET is the default method
 
179
        uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
 
180
        (response, content) = self.http.request(uri)
 
181
        self.assertEqual(response['x-method'], "GET")
 
182
 
 
183
    def testDifferentMethods(self):
 
184
        # Test that all methods can be used
 
185
        uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
 
186
        for method in ["GET", "PUT", "DELETE", "POST"]:
 
187
            (response, content) = self.http.request(uri, method, body=" ")
 
188
            self.assertEqual(response['x-method'], method)
 
189
 
 
190
    def testHeadRead(self):
 
191
        # Test that we don't try to read the response of a HEAD request
 
192
        # since httplib blocks response.read() for HEAD requests.
 
193
        # Oddly enough this doesn't appear as a problem when doing HEAD requests
 
194
        # against Apache servers.
 
195
        uri = "http://www.google.com/"
 
196
        (response, content) = self.http.request(uri, "HEAD")
 
197
        self.assertEqual(response.status, 200)
 
198
        self.assertEqual(content, "")
 
199
 
 
200
    def testGetNoCache(self):
 
201
        # Test that can do a GET w/o the cache turned on.
 
202
        http = httplib2.Http()
 
203
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
204
        (response, content) = http.request(uri, "GET")
 
205
        self.assertEqual(response.status, 200)
 
206
        self.assertEqual(response.previous, None)
 
207
 
 
208
    def testGetOnlyIfCachedCacheHit(self):
 
209
        # Test that can do a GET with cache and 'only-if-cached'
 
210
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
211
        (response, content) = self.http.request(uri, "GET")
 
212
        (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
 
213
        self.assertEqual(response.fromcache, True)
 
214
        self.assertEqual(response.status, 200)
 
215
 
 
216
    def testGetOnlyIfCachedCacheMiss(self):
 
217
        # Test that can do a GET with no cache with 'only-if-cached'
 
218
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
219
        (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
 
220
        self.assertEqual(response.fromcache, False)
 
221
        self.assertEqual(response.status, 504)
 
222
 
 
223
    def testGetOnlyIfCachedNoCacheAtAll(self):
 
224
        # Test that can do a GET with no cache with 'only-if-cached'
 
225
        # Of course, there might be an intermediary beyond us
 
226
        # that responds to the 'only-if-cached', so this
 
227
        # test can't really be guaranteed to pass.
 
228
        http = httplib2.Http()
 
229
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
230
        (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
 
231
        self.assertEqual(response.fromcache, False)
 
232
        self.assertEqual(response.status, 504)
 
233
 
 
234
    def testUserAgent(self):
 
235
        # Test that we provide a default user-agent
 
236
        uri = urlparse.urljoin(base, "user-agent/test.cgi")
 
237
        (response, content) = self.http.request(uri, "GET")
 
238
        self.assertEqual(response.status, 200)
 
239
        self.assertTrue(content.startswith("Python-httplib2/"))
 
240
 
 
241
    def testUserAgentNonDefault(self):
 
242
        # Test that the default user-agent can be over-ridden
 
243
 
 
244
        uri = urlparse.urljoin(base, "user-agent/test.cgi")
 
245
        (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
 
246
        self.assertEqual(response.status, 200)
 
247
        self.assertTrue(content.startswith("fred/1.0"))
 
248
 
 
249
    def testGet300WithLocation(self):
 
250
        # Test the we automatically follow 300 redirects if a Location: header is provided
 
251
        uri = urlparse.urljoin(base, "300/with-location-header.asis")
 
252
        (response, content) = self.http.request(uri, "GET")
 
253
        self.assertEqual(response.status, 200)
 
254
        self.assertEqual(content, "This is the final destination.\n")
 
255
        self.assertEqual(response.previous.status, 300)
 
256
        self.assertEqual(response.previous.fromcache, False)
 
257
 
 
258
        # Confirm that the intermediate 300 is not cached
 
259
        (response, content) = self.http.request(uri, "GET")
 
260
        self.assertEqual(response.status, 200)
 
261
        self.assertEqual(content, "This is the final destination.\n")
 
262
        self.assertEqual(response.previous.status, 300)
 
263
        self.assertEqual(response.previous.fromcache, False)
 
264
 
 
265
    def testGet300WithLocationNoRedirect(self):
 
266
        # Test the we automatically follow 300 redirects if a Location: header is provided
 
267
        self.http.follow_redirects = False
 
268
        uri = urlparse.urljoin(base, "300/with-location-header.asis")
 
269
        (response, content) = self.http.request(uri, "GET")
 
270
        self.assertEqual(response.status, 300)
 
271
 
 
272
    def testGet300WithoutLocation(self):
 
273
        # Not giving a Location: header in a 300 response is acceptable
 
274
        # In which case we just return the 300 response
 
275
        uri = urlparse.urljoin(base, "300/without-location-header.asis")
 
276
        (response, content) = self.http.request(uri, "GET")
 
277
        self.assertEqual(response.status, 300)
 
278
        self.assertTrue(response['content-type'].startswith("text/html"))
 
279
        self.assertEqual(response.previous, None)
 
280
 
 
281
    def testGet301(self):
 
282
        # Test that we automatically follow 301 redirects
 
283
        # and that we cache the 301 response
 
284
        uri = urlparse.urljoin(base, "301/onestep.asis")
 
285
        destination = urlparse.urljoin(base, "302/final-destination.txt")
 
286
        (response, content) = self.http.request(uri, "GET")
 
287
        self.assertEqual(response.status, 200)
 
288
        self.assertTrue(response.has_key('content-location'))
 
289
        self.assertEqual(response['content-location'], destination)
 
290
        self.assertEqual(content, "This is the final destination.\n")
 
291
        self.assertEqual(response.previous.status, 301)
 
292
        self.assertEqual(response.previous.fromcache, False)
 
293
 
 
294
        (response, content) = self.http.request(uri, "GET")
 
295
        self.assertEqual(response.status, 200)
 
296
        self.assertEqual(response['content-location'], destination)
 
297
        self.assertEqual(content, "This is the final destination.\n")
 
298
        self.assertEqual(response.previous.status, 301)
 
299
        self.assertEqual(response.previous.fromcache, True)
 
300
 
 
301
 
 
302
    def testGet301NoRedirect(self):
 
303
        # Test that we automatically follow 301 redirects
 
304
        # and that we cache the 301 response
 
305
        self.http.follow_redirects = False
 
306
        uri = urlparse.urljoin(base, "301/onestep.asis")
 
307
        destination = urlparse.urljoin(base, "302/final-destination.txt")
 
308
        (response, content) = self.http.request(uri, "GET")
 
309
        self.assertEqual(response.status, 301)
 
310
 
 
311
 
 
312
    def testGet302(self):
 
313
        # Test that we automatically follow 302 redirects
 
314
        # and that we DO NOT cache the 302 response
 
315
        uri = urlparse.urljoin(base, "302/onestep.asis")
 
316
        destination = urlparse.urljoin(base, "302/final-destination.txt")
 
317
        (response, content) = self.http.request(uri, "GET")
 
318
        self.assertEqual(response.status, 200)
 
319
        self.assertEqual(response['content-location'], destination)
 
320
        self.assertEqual(content, "This is the final destination.\n")
 
321
        self.assertEqual(response.previous.status, 302)
 
322
        self.assertEqual(response.previous.fromcache, False)
 
323
 
 
324
        uri = urlparse.urljoin(base, "302/onestep.asis")
 
325
        (response, content) = self.http.request(uri, "GET")
 
326
        self.assertEqual(response.status, 200)
 
327
        self.assertEqual(response.fromcache, True)
 
328
        self.assertEqual(response['content-location'], destination)
 
329
        self.assertEqual(content, "This is the final destination.\n")
 
330
        self.assertEqual(response.previous.status, 302)
 
331
        self.assertEqual(response.previous.fromcache, False)
 
332
        self.assertEqual(response.previous['content-location'], uri)
 
333
 
 
334
        uri = urlparse.urljoin(base, "302/twostep.asis")
 
335
 
 
336
        (response, content) = self.http.request(uri, "GET")
 
337
        self.assertEqual(response.status, 200)
 
338
        self.assertEqual(response.fromcache, True)
 
339
        self.assertEqual(content, "This is the final destination.\n")
 
340
        self.assertEqual(response.previous.status, 302)
 
341
        self.assertEqual(response.previous.fromcache, False)
 
342
 
 
343
    def testGet302RedirectionLimit(self):
 
344
        # Test that we can set a lower redirection limit
 
345
        # and that we raise an exception when we exceed
 
346
        # that limit.
 
347
        self.http.force_exception_to_status_code = False 
 
348
 
 
349
        uri = urlparse.urljoin(base, "302/twostep.asis")
 
350
        try:
 
351
            (response, content) = self.http.request(uri, "GET", redirections = 1)
 
352
            self.fail("This should not happen")
 
353
        except httplib2.RedirectLimit:
 
354
            pass
 
355
        except Exception, e:
 
356
            self.fail("Threw wrong kind of exception ")
 
357
 
 
358
        # Re-run the test with out the exceptions
 
359
        self.http.force_exception_to_status_code = True 
 
360
 
 
361
        (response, content) = self.http.request(uri, "GET", redirections = 1)
 
362
        self.assertEqual(response.status, 500)
 
363
        self.assertTrue(response.reason.startswith("Redirected more"))
 
364
        self.assertEqual("302", response['status'])
 
365
        self.assertTrue(content.startswith("<html>"))
 
366
        self.assertTrue(response.previous != None)
 
367
 
 
368
    def testGet302NoLocation(self):
 
369
        # Test that we throw an exception when we get
 
370
        # a 302 with no Location: header.
 
371
        self.http.force_exception_to_status_code = False 
 
372
        uri = urlparse.urljoin(base, "302/no-location.asis")
 
373
        try:
 
374
            (response, content) = self.http.request(uri, "GET")
 
375
            self.fail("Should never reach here")
 
376
        except httplib2.RedirectMissingLocation:
 
377
            pass
 
378
        except Exception, e:
 
379
            self.fail("Threw wrong kind of exception ")
 
380
 
 
381
        # Re-run the test with out the exceptions
 
382
        self.http.force_exception_to_status_code = True 
 
383
 
 
384
        (response, content) = self.http.request(uri, "GET")
 
385
        self.assertEqual(response.status, 500)
 
386
        self.assertTrue(response.reason.startswith("Redirected but"))
 
387
        self.assertEqual("302", response['status'])
 
388
        self.assertTrue(content.startswith("This is content"))
 
389
 
 
390
    def testGet302ViaHttps(self):
 
391
        # Google always redirects to http://google.com
 
392
        (response, content) = self.http.request("https://google.com", "GET")
 
393
        self.assertEqual(200, response.status)
 
394
        self.assertEqual(302, response.previous.status)
 
395
 
 
396
    def testGetViaHttps(self):
 
397
        # Test that we can handle HTTPS
 
398
        (response, content) = self.http.request("https://google.com/adsense/", "GET")
 
399
        self.assertEqual(200, response.status)
 
400
 
 
401
    def testGetViaHttpsSpecViolationOnLocation(self):
 
402
        # Test that we follow redirects through HTTPS
 
403
        # even if they violate the spec by including
 
404
        # a relative Location: header instead of an 
 
405
        # absolute one.
 
406
        (response, content) = self.http.request("https://google.com/adsense", "GET")
 
407
        self.assertEqual(200, response.status)
 
408
        self.assertNotEqual(None, response.previous)
 
409
 
 
410
 
 
411
    def testGetViaHttpsKeyCert(self):
 
412
        #  At this point I can only test
 
413
        #  that the key and cert files are passed in 
 
414
        #  correctly to httplib. It would be nice to have 
 
415
        #  a real https endpoint to test against.
 
416
        http = httplib2.Http(timeout=2)
 
417
 
 
418
        http.add_certificate("akeyfile", "acertfile", "bitworking.org")
 
419
        try:
 
420
            (response, content) = http.request("https://bitworking.org", "GET")
 
421
        except:
 
422
            pass
 
423
        self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
 
424
        self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
 
425
 
 
426
        try:
 
427
            (response, content) = http.request("https://notthere.bitworking.org", "GET")
 
428
        except:
 
429
            pass
 
430
        self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
 
431
        self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
 
432
 
 
433
 
 
434
 
 
435
 
 
436
    def testGet303(self):
 
437
        # Do a follow-up GET on a Location: header
 
438
        # returned from a POST that gave a 303.
 
439
        uri = urlparse.urljoin(base, "303/303.cgi")
 
440
        (response, content) = self.http.request(uri, "POST", " ")
 
441
        self.assertEqual(response.status, 200)
 
442
        self.assertEqual(content, "This is the final destination.\n")
 
443
        self.assertEqual(response.previous.status, 303)
 
444
 
 
445
    def testGet303NoRedirect(self):
 
446
        # Do a follow-up GET on a Location: header
 
447
        # returned from a POST that gave a 303.
 
448
        self.http.follow_redirects = False
 
449
        uri = urlparse.urljoin(base, "303/303.cgi")
 
450
        (response, content) = self.http.request(uri, "POST", " ")
 
451
        self.assertEqual(response.status, 303)
 
452
 
 
453
    def test303ForDifferentMethods(self):
 
454
        # Test that all methods can be used
 
455
        uri = urlparse.urljoin(base, "303/redirect-to-reflector.cgi")
 
456
        for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]: 
 
457
            (response, content) = self.http.request(uri, method, body=" ")
 
458
            self.assertEqual(response['x-method'], method_on_303)
 
459
 
 
460
    def testGet304(self):
 
461
        # Test that we use ETags properly to validate our cache
 
462
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
463
        (response, content) = self.http.request(uri, "GET")
 
464
        self.assertNotEqual(response['etag'], "")
 
465
 
 
466
        (response, content) = self.http.request(uri, "GET")
 
467
        (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
 
468
        self.assertEqual(response.status, 200)
 
469
        self.assertEqual(response.fromcache, True)
 
470
 
 
471
        cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
 
472
        f = open(cache_file_name, "r")
 
473
        status_line = f.readline()
 
474
        f.close()
 
475
 
 
476
        self.assertTrue(status_line.startswith("status:"))
 
477
 
 
478
        (response, content) = self.http.request(uri, "HEAD")
 
479
        self.assertEqual(response.status, 200)
 
480
        self.assertEqual(response.fromcache, True)
 
481
 
 
482
        (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
 
483
        self.assertEqual(response.status, 206)
 
484
        self.assertEqual(response.fromcache, False)
 
485
 
 
486
    def testGetIgnoreEtag(self):
 
487
        # Test that we can forcibly ignore ETags 
 
488
        uri = urlparse.urljoin(base, "reflector/reflector.cgi")
 
489
        (response, content) = self.http.request(uri, "GET")
 
490
        self.assertNotEqual(response['etag'], "")
 
491
 
 
492
        (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
 
493
        d = self.reflector(content)
 
494
        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH')) 
 
495
 
 
496
        self.http.ignore_etag = True
 
497
        (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
 
498
        d = self.reflector(content)
 
499
        self.assertEqual(response.fromcache, False)
 
500
        self.assertFalse(d.has_key('HTTP_IF_NONE_MATCH')) 
 
501
 
 
502
    def testOverrideEtag(self):
 
503
        # Test that we can forcibly ignore ETags 
 
504
        uri = urlparse.urljoin(base, "reflector/reflector.cgi")
 
505
        (response, content) = self.http.request(uri, "GET")
 
506
        self.assertNotEqual(response['etag'], "")
 
507
 
 
508
        (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
 
509
        d = self.reflector(content)
 
510
        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH')) 
 
511
        self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred") 
 
512
 
 
513
        (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
 
514
        d = self.reflector(content)
 
515
        self.assertTrue(d.has_key('HTTP_IF_NONE_MATCH')) 
 
516
        self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred") 
 
517
 
 
518
#MAP-commented this out because it consistently fails
 
519
#    def testGet304EndToEnd(self):
 
520
#       # Test that end to end headers get overwritten in the cache
 
521
#        uri = urlparse.urljoin(base, "304/end2end.cgi")
 
522
#        (response, content) = self.http.request(uri, "GET")
 
523
#        self.assertNotEqual(response['etag'], "")
 
524
#        old_date = response['date']
 
525
#        time.sleep(2)
 
526
#
 
527
#        (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
 
528
#        # The response should be from the cache, but the Date: header should be updated.
 
529
#        new_date = response['date']
 
530
#        self.assertNotEqual(new_date, old_date)
 
531
#        self.assertEqual(response.status, 200)
 
532
#        self.assertEqual(response.fromcache, True)
 
533
 
 
534
    def testGet304LastModified(self):
 
535
        # Test that we can still handle a 304 
 
536
        # by only using the last-modified cache validator.
 
537
        uri = urlparse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
 
538
        (response, content) = self.http.request(uri, "GET")
 
539
 
 
540
        self.assertNotEqual(response['last-modified'], "")
 
541
        (response, content) = self.http.request(uri, "GET")
 
542
        (response, content) = self.http.request(uri, "GET")
 
543
        self.assertEqual(response.status, 200)
 
544
        self.assertEqual(response.fromcache, True)
 
545
 
 
546
    def testGet307(self):
 
547
        # Test that we do follow 307 redirects but
 
548
        # do not cache the 307
 
549
        uri = urlparse.urljoin(base, "307/onestep.asis")
 
550
        (response, content) = self.http.request(uri, "GET")
 
551
        self.assertEqual(response.status, 200)
 
552
        self.assertEqual(content, "This is the final destination.\n")
 
553
        self.assertEqual(response.previous.status, 307)
 
554
        self.assertEqual(response.previous.fromcache, False)
 
555
 
 
556
        (response, content) = self.http.request(uri, "GET")
 
557
        self.assertEqual(response.status, 200)
 
558
        self.assertEqual(response.fromcache, True)
 
559
        self.assertEqual(content, "This is the final destination.\n")
 
560
        self.assertEqual(response.previous.status, 307)
 
561
        self.assertEqual(response.previous.fromcache, False)
 
562
 
 
563
    def testGet410(self):
 
564
        # Test that we pass 410's through
 
565
        uri = urlparse.urljoin(base, "410/410.asis")
 
566
        (response, content) = self.http.request(uri, "GET")
 
567
        self.assertEqual(response.status, 410)
 
568
 
 
569
    def testVaryHeaderSimple(self):
 
570
        """
 
571
        RFC 2616 13.6
 
572
        When the cache receives a subsequent request whose Request-URI
 
573
        specifies one or more cache entries including a Vary header field,
 
574
        the cache MUST NOT use such a cache entry to construct a response
 
575
        to the new request unless all of the selecting request-headers
 
576
        present in the new request match the corresponding stored
 
577
        request-headers in the original request.
 
578
        """
 
579
        # test that the vary header is sent
 
580
        uri = urlparse.urljoin(base, "vary/accept.asis")
 
581
        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 
582
        self.assertEqual(response.status, 200)
 
583
        self.assertTrue(response.has_key('vary'))
 
584
 
 
585
        # get the resource again, from the cache since accept header in this
 
586
        # request is the same as the request
 
587
        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 
588
        self.assertEqual(response.status, 200)
 
589
        self.assertEqual(response.fromcache, True, msg="Should be from cache")
 
590
 
 
591
        # get the resource again, not from cache since Accept headers does not match
 
592
        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
 
593
        self.assertEqual(response.status, 200)
 
594
        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
 
595
 
 
596
        # get the resource again, without any Accept header, so again no match
 
597
        (response, content) = self.http.request(uri, "GET")
 
598
        self.assertEqual(response.status, 200)
 
599
        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
 
600
 
 
601
    def testNoVary(self):
 
602
        # when there is no vary, a different Accept header (e.g.) should not
 
603
        # impact if the cache is used
 
604
        # test that the vary header is not sent
 
605
        uri = urlparse.urljoin(base, "vary/no-vary.asis")
 
606
        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 
607
        self.assertEqual(response.status, 200)
 
608
        self.assertFalse(response.has_key('vary'))
 
609
 
 
610
        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 
611
        self.assertEqual(response.status, 200)
 
612
        self.assertEqual(response.fromcache, True, msg="Should be from cache")
 
613
        
 
614
        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
 
615
        self.assertEqual(response.status, 200)
 
616
        self.assertEqual(response.fromcache, True, msg="Should be from cache")
 
617
 
 
618
    def testVaryHeaderDouble(self):
 
619
        uri = urlparse.urljoin(base, "vary/accept-double.asis")
 
620
        (response, content) = self.http.request(uri, "GET", headers={
 
621
            'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
 
622
        self.assertEqual(response.status, 200)
 
623
        self.assertTrue(response.has_key('vary'))
 
624
 
 
625
        # we are from cache
 
626
        (response, content) = self.http.request(uri, "GET", headers={
 
627
            'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
 
628
        self.assertEqual(response.fromcache, True, msg="Should be from cache")
 
629
 
 
630
        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 
631
        self.assertEqual(response.status, 200)
 
632
        self.assertEqual(response.fromcache, False)
 
633
 
 
634
        # get the resource again, not from cache, varied headers don't match exact
 
635
        (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
 
636
        self.assertEqual(response.status, 200)
 
637
        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
 
638
 
 
639
 
 
640
    def testHeadGZip(self):
 
641
        # Test that we don't try to decompress a HEAD response 
 
642
        uri = urlparse.urljoin(base, "gzip/final-destination.txt")
 
643
        (response, content) = self.http.request(uri, "HEAD")
 
644
        self.assertEqual(response.status, 200)
 
645
        self.assertNotEqual(int(response['content-length']), 0)
 
646
        self.assertEqual(content, "")
 
647
 
 
648
    def testGetGZip(self):
 
649
        # Test that we support gzip compression
 
650
        uri = urlparse.urljoin(base, "gzip/final-destination.txt")
 
651
        (response, content) = self.http.request(uri, "GET")
 
652
        self.assertEqual(response.status, 200)
 
653
        self.assertFalse(response.has_key('content-encoding'))
 
654
        self.assertTrue(response.has_key('-content-encoding'))
 
655
        self.assertEqual(int(response['content-length']), len("This is the final destination.\n"))
 
656
        self.assertEqual(content, "This is the final destination.\n")
 
657
 
 
658
    def testGetGZipFailure(self):
 
659
        # Test that we raise a good exception when the gzip fails
 
660
        self.http.force_exception_to_status_code = False 
 
661
        uri = urlparse.urljoin(base, "gzip/failed-compression.asis")
 
662
        try:
 
663
            (response, content) = self.http.request(uri, "GET")
 
664
            self.fail("Should never reach here")
 
665
        except httplib2.FailedToDecompressContent:
 
666
            pass
 
667
        except Exception:
 
668
            self.fail("Threw wrong kind of exception")
 
669
 
 
670
        # Re-run the test with out the exceptions
 
671
        self.http.force_exception_to_status_code = True 
 
672
 
 
673
        (response, content) = self.http.request(uri, "GET")
 
674
        self.assertEqual(response.status, 500)
 
675
        self.assertTrue(response.reason.startswith("Content purported"))
 
676
 
 
677
    def testTimeout(self):
 
678
        self.http.force_exception_to_status_code = True 
 
679
        uri = urlparse.urljoin(base, "timeout/timeout.cgi")
 
680
        try:
 
681
            import socket
 
682
            socket.setdefaulttimeout(1) 
 
683
        except:
 
684
            # Don't run the test if we can't set the timeout
 
685
            return 
 
686
        (response, content) = self.http.request(uri)
 
687
        self.assertEqual(response.status, 408)
 
688
        self.assertTrue(response.reason.startswith("Request Timeout"))
 
689
        self.assertTrue(content.startswith("Request Timeout"))
 
690
 
 
691
    def testIndividualTimeout(self):
 
692
        uri = urlparse.urljoin(base, "timeout/timeout.cgi")
 
693
        http = httplib2.Http(timeout=1)
 
694
        http.force_exception_to_status_code = True 
 
695
 
 
696
        (response, content) = http.request(uri)
 
697
        self.assertEqual(response.status, 408)
 
698
        self.assertTrue(response.reason.startswith("Request Timeout"))
 
699
        self.assertTrue(content.startswith("Request Timeout"))
 
700
 
 
701
 
 
702
    def testHTTPSInitTimeout(self):
 
703
        c = httplib2.HTTPSConnectionWithTimeout('localhost', 80, timeout=47)
 
704
        self.assertEqual(47, c.timeout)
 
705
 
 
706
    def testGetDeflate(self):
 
707
        # Test that we support deflate compression
 
708
        uri = urlparse.urljoin(base, "deflate/deflated.asis")
 
709
        (response, content) = self.http.request(uri, "GET")
 
710
        self.assertEqual(response.status, 200)
 
711
        self.assertFalse(response.has_key('content-encoding'))
 
712
        self.assertEqual(int(response['content-length']), len("This is the final destination."))
 
713
        self.assertEqual(content, "This is the final destination.")
 
714
 
 
715
    def testGetDeflateFailure(self):
 
716
        # Test that we raise a good exception when the deflate fails
 
717
        self.http.force_exception_to_status_code = False 
 
718
 
 
719
        uri = urlparse.urljoin(base, "deflate/failed-compression.asis")
 
720
        try:
 
721
            (response, content) = self.http.request(uri, "GET")
 
722
            self.fail("Should never reach here")
 
723
        except httplib2.FailedToDecompressContent:
 
724
            pass
 
725
        except Exception:
 
726
            self.fail("Threw wrong kind of exception")
 
727
 
 
728
        # Re-run the test with out the exceptions
 
729
        self.http.force_exception_to_status_code = True 
 
730
 
 
731
        (response, content) = self.http.request(uri, "GET")
 
732
        self.assertEqual(response.status, 500)
 
733
        self.assertTrue(response.reason.startswith("Content purported"))
 
734
 
 
735
    def testGetDuplicateHeaders(self):
 
736
        # Test that duplicate headers get concatenated via ','
 
737
        uri = urlparse.urljoin(base, "duplicate-headers/multilink.asis")
 
738
        (response, content) = self.http.request(uri, "GET")
 
739
        self.assertEqual(response.status, 200)
 
740
        self.assertEqual(content, "This is content\n")
 
741
        self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
 
742
 
 
743
    def testGetCacheControlNoCache(self):
 
744
        # Test Cache-Control: no-cache on requests
 
745
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
746
        (response, content) = self.http.request(uri, "GET")
 
747
        self.assertNotEqual(response['etag'], "")
 
748
        (response, content) = self.http.request(uri, "GET")
 
749
        self.assertEqual(response.status, 200)
 
750
        self.assertEqual(response.fromcache, True)
 
751
 
 
752
        (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
 
753
        self.assertEqual(response.status, 200)
 
754
        self.assertEqual(response.fromcache, False)
 
755
 
 
756
    def testGetCacheControlPragmaNoCache(self):
 
757
        # Test Pragma: no-cache on requests
 
758
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
759
        (response, content) = self.http.request(uri, "GET")
 
760
        self.assertNotEqual(response['etag'], "")
 
761
        (response, content) = self.http.request(uri, "GET")
 
762
        self.assertEqual(response.status, 200)
 
763
        self.assertEqual(response.fromcache, True)
 
764
 
 
765
        (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
 
766
        self.assertEqual(response.status, 200)
 
767
        self.assertEqual(response.fromcache, False)
 
768
 
 
769
    def testGetCacheControlNoStoreRequest(self):
 
770
        # A no-store request means that the response should not be stored.
 
771
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
772
 
 
773
        (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
 
774
        self.assertEqual(response.status, 200)
 
775
        self.assertEqual(response.fromcache, False)
 
776
 
 
777
        (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
 
778
        self.assertEqual(response.status, 200)
 
779
        self.assertEqual(response.fromcache, False)
 
780
 
 
781
    def testGetCacheControlNoStoreResponse(self):
 
782
        # A no-store response means that the response should not be stored.
 
783
        uri = urlparse.urljoin(base, "no-store/no-store.asis")
 
784
 
 
785
        (response, content) = self.http.request(uri, "GET")
 
786
        self.assertEqual(response.status, 200)
 
787
        self.assertEqual(response.fromcache, False)
 
788
 
 
789
        (response, content) = self.http.request(uri, "GET")
 
790
        self.assertEqual(response.status, 200)
 
791
        self.assertEqual(response.fromcache, False)
 
792
 
 
793
    def testGetCacheControlNoCacheNoStoreRequest(self):
 
794
        # Test that a no-store, no-cache clears the entry from the cache
 
795
        # even if it was cached previously.
 
796
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
797
 
 
798
        (response, content) = self.http.request(uri, "GET")
 
799
        (response, content) = self.http.request(uri, "GET")
 
800
        self.assertEqual(response.fromcache, True)
 
801
        (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
 
802
        (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
 
803
        self.assertEqual(response.status, 200)
 
804
        self.assertEqual(response.fromcache, False)
 
805
 
 
806
    def testUpdateInvalidatesCache(self):
 
807
        # Test that calling PUT or DELETE on a 
 
808
        # URI that is cache invalidates that cache.
 
809
        uri = urlparse.urljoin(base, "304/test_etag.txt")
 
810
 
 
811
        (response, content) = self.http.request(uri, "GET")
 
812
        (response, content) = self.http.request(uri, "GET")
 
813
        self.assertEqual(response.fromcache, True)
 
814
        (response, content) = self.http.request(uri, "DELETE")
 
815
        self.assertEqual(response.status, 405)
 
816
 
 
817
        (response, content) = self.http.request(uri, "GET")
 
818
        self.assertEqual(response.fromcache, False)
 
819
 
 
820
    def testUpdateUsesCachedETag(self):
 
821
        # Test that we natively support http://www.w3.org/1999/04/Editing/ 
 
822
        uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
 
823
 
 
824
        (response, content) = self.http.request(uri, "GET")
 
825
        self.assertEqual(response.status, 200)
 
826
        self.assertEqual(response.fromcache, False)
 
827
        (response, content) = self.http.request(uri, "GET")
 
828
        self.assertEqual(response.status, 200)
 
829
        self.assertEqual(response.fromcache, True)
 
830
        (response, content) = self.http.request(uri, "PUT", body="foo")
 
831
        self.assertEqual(response.status, 200)
 
832
        (response, content) = self.http.request(uri, "PUT", body="foo")
 
833
        self.assertEqual(response.status, 412)
 
834
 
 
835
    def testUpdateUsesCachedETagAndOCMethod(self):
 
836
        # Test that we natively support http://www.w3.org/1999/04/Editing/ 
 
837
        uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
 
838
 
 
839
        (response, content) = self.http.request(uri, "GET")
 
840
        self.assertEqual(response.status, 200)
 
841
        self.assertEqual(response.fromcache, False)
 
842
        (response, content) = self.http.request(uri, "GET")
 
843
        self.assertEqual(response.status, 200)
 
844
        self.assertEqual(response.fromcache, True)
 
845
        self.http.optimistic_concurrency_methods.append("DELETE")
 
846
        (response, content) = self.http.request(uri, "DELETE")
 
847
        self.assertEqual(response.status, 200)
 
848
 
 
849
 
 
850
    def testUpdateUsesCachedETagOverridden(self):
 
851
        # Test that we natively support http://www.w3.org/1999/04/Editing/ 
 
852
        uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
 
853
 
 
854
        (response, content) = self.http.request(uri, "GET")
 
855
        self.assertEqual(response.status, 200)
 
856
        self.assertEqual(response.fromcache, False)
 
857
        (response, content) = self.http.request(uri, "GET")
 
858
        self.assertEqual(response.status, 200)
 
859
        self.assertEqual(response.fromcache, True)
 
860
        (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
 
861
        self.assertEqual(response.status, 412)
 
862
 
 
863
    def testBasicAuth(self):
 
864
        # Test Basic Authentication
 
865
        uri = urlparse.urljoin(base, "basic/file.txt")
 
866
        (response, content) = self.http.request(uri, "GET")
 
867
        self.assertEqual(response.status, 401)
 
868
 
 
869
        uri = urlparse.urljoin(base, "basic/")
 
870
        (response, content) = self.http.request(uri, "GET")
 
871
        self.assertEqual(response.status, 401)
 
872
 
 
873
        self.http.add_credentials('joe', 'password')
 
874
        (response, content) = self.http.request(uri, "GET")
 
875
        self.assertEqual(response.status, 200)
 
876
 
 
877
        uri = urlparse.urljoin(base, "basic/file.txt")
 
878
        (response, content) = self.http.request(uri, "GET")
 
879
        self.assertEqual(response.status, 200)
 
880
 
 
881
    def testBasicAuthWithDomain(self):
 
882
        # Test Basic Authentication
 
883
        uri = urlparse.urljoin(base, "basic/file.txt")
 
884
        (response, content) = self.http.request(uri, "GET")
 
885
        self.assertEqual(response.status, 401)
 
886
 
 
887
        uri = urlparse.urljoin(base, "basic/")
 
888
        (response, content) = self.http.request(uri, "GET")
 
889
        self.assertEqual(response.status, 401)
 
890
 
 
891
        self.http.add_credentials('joe', 'password', "example.org")
 
892
        (response, content) = self.http.request(uri, "GET")
 
893
        self.assertEqual(response.status, 401)
 
894
 
 
895
        uri = urlparse.urljoin(base, "basic/file.txt")
 
896
        (response, content) = self.http.request(uri, "GET")
 
897
        self.assertEqual(response.status, 401)
 
898
 
 
899
        domain = urlparse.urlparse(base)[1] 
 
900
        self.http.add_credentials('joe', 'password', domain)
 
901
        (response, content) = self.http.request(uri, "GET")
 
902
        self.assertEqual(response.status, 200)
 
903
 
 
904
        uri = urlparse.urljoin(base, "basic/file.txt")
 
905
        (response, content) = self.http.request(uri, "GET")
 
906
        self.assertEqual(response.status, 200)
 
907
 
 
908
 
 
909
 
 
910
 
 
911
 
 
912
 
 
913
    def testBasicAuthTwoDifferentCredentials(self):
 
914
        # Test Basic Authentication with multiple sets of credentials
 
915
        uri = urlparse.urljoin(base, "basic2/file.txt")
 
916
        (response, content) = self.http.request(uri, "GET")
 
917
        self.assertEqual(response.status, 401)
 
918
 
 
919
        uri = urlparse.urljoin(base, "basic2/")
 
920
        (response, content) = self.http.request(uri, "GET")
 
921
        self.assertEqual(response.status, 401)
 
922
 
 
923
        self.http.add_credentials('fred', 'barney')
 
924
        (response, content) = self.http.request(uri, "GET")
 
925
        self.assertEqual(response.status, 200)
 
926
 
 
927
        uri = urlparse.urljoin(base, "basic2/file.txt")
 
928
        (response, content) = self.http.request(uri, "GET")
 
929
        self.assertEqual(response.status, 200)
 
930
 
 
931
    def testBasicAuthNested(self):
 
932
        # Test Basic Authentication with resources
 
933
        # that are nested
 
934
        uri = urlparse.urljoin(base, "basic-nested/")
 
935
        (response, content) = self.http.request(uri, "GET")
 
936
        self.assertEqual(response.status, 401)
 
937
 
 
938
        uri = urlparse.urljoin(base, "basic-nested/subdir")
 
939
        (response, content) = self.http.request(uri, "GET")
 
940
        self.assertEqual(response.status, 401)
 
941
 
 
942
        # Now add in credentials one at a time and test.
 
943
        self.http.add_credentials('joe', 'password')
 
944
 
 
945
        uri = urlparse.urljoin(base, "basic-nested/")
 
946
        (response, content) = self.http.request(uri, "GET")
 
947
        self.assertEqual(response.status, 200)
 
948
 
 
949
        uri = urlparse.urljoin(base, "basic-nested/subdir")
 
950
        (response, content) = self.http.request(uri, "GET")
 
951
        self.assertEqual(response.status, 401)
 
952
 
 
953
        self.http.add_credentials('fred', 'barney')
 
954
 
 
955
        uri = urlparse.urljoin(base, "basic-nested/")
 
956
        (response, content) = self.http.request(uri, "GET")
 
957
        self.assertEqual(response.status, 200)
 
958
 
 
959
        uri = urlparse.urljoin(base, "basic-nested/subdir")
 
960
        (response, content) = self.http.request(uri, "GET")
 
961
        self.assertEqual(response.status, 200)
 
962
 
 
963
    def testDigestAuth(self):
 
964
        # Test that we support Digest Authentication
 
965
        uri = urlparse.urljoin(base, "digest/")
 
966
        (response, content) = self.http.request(uri, "GET")
 
967
        self.assertEqual(response.status, 401)
 
968
 
 
969
        self.http.add_credentials('joe', 'password')
 
970
        (response, content) = self.http.request(uri, "GET")
 
971
        self.assertEqual(response.status, 200)
 
972
 
 
973
        uri = urlparse.urljoin(base, "digest/file.txt")
 
974
        (response, content) = self.http.request(uri, "GET")
 
975
 
 
976
    def testDigestAuthNextNonceAndNC(self):
 
977
        # Test that if the server sets nextnonce that we reset
 
978
        # the nonce count back to 1
 
979
        uri = urlparse.urljoin(base, "digest/file.txt")
 
980
        self.http.add_credentials('joe', 'password')
 
981
        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 
982
        info = httplib2._parse_www_authenticate(response, 'authentication-info')
 
983
        self.assertEqual(response.status, 200)
 
984
        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 
985
        info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
 
986
        self.assertEqual(response.status, 200)
 
987
 
 
988
        if info.has_key('nextnonce'):
 
989
            self.assertEqual(info2['nc'], 1)
 
990
 
 
991
    def testDigestAuthStale(self):
 
992
        # Test that we can handle a nonce becoming stale
 
993
        uri = urlparse.urljoin(base, "digest-expire/file.txt")
 
994
        self.http.add_credentials('joe', 'password')
 
995
        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 
996
        info = httplib2._parse_www_authenticate(response, 'authentication-info')
 
997
        self.assertEqual(response.status, 200)
 
998
 
 
999
        time.sleep(3)
 
1000
        # Sleep long enough that the nonce becomes stale
 
1001
 
 
1002
        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 
1003
        self.assertFalse(response.fromcache)
 
1004
        self.assertTrue(response._stale_digest)
 
1005
        info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
 
1006
        self.assertEqual(response.status, 200)
 
1007
 
 
1008
    def reflector(self, content):
 
1009
        return  dict( [tuple(x.split("=", 1)) for x in content.strip().split("\n")] )
 
1010
 
 
1011
    def testReflector(self):
 
1012
        uri = urlparse.urljoin(base, "reflector/reflector.cgi")
 
1013
        (response, content) = self.http.request(uri, "GET")
 
1014
        d = self.reflector(content)
 
1015
        self.assertTrue(d.has_key('HTTP_USER_AGENT')) 
 
1016
 
 
1017
    def testConnectionClose(self):
 
1018
        uri = "http://www.google.com/"
 
1019
        (response, content) = self.http.request(uri, "GET")
 
1020
        for c in self.http.connections.values():
 
1021
            self.assertNotEqual(None, c.sock)
 
1022
        (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
 
1023
        for c in self.http.connections.values():
 
1024
            self.assertEqual(None, c.sock)
 
1025
 
 
1026
 
 
1027
try:
 
1028
    import memcache
 
1029
    class HttpTestMemCached(HttpTest):
 
1030
        def setUp(self):
 
1031
            self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
 
1032
            #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
 
1033
            self.http = httplib2.Http(self.cache)
 
1034
            self.cache.flush_all()
 
1035
            # Not exactly sure why the sleep is needed here, but
 
1036
            # if not present then some unit tests that rely on caching
 
1037
            # fail. Memcached seems to lose some sets immediately
 
1038
            # after a flush_all if the set is to a value that
 
1039
            # was previously cached. (Maybe the flush is handled async?)
 
1040
            time.sleep(1)
 
1041
            self.http.clear_credentials()
 
1042
except:
 
1043
    pass
 
1044
 
 
1045
 
 
1046
 
 
1047
 
 
1048
# ------------------------------------------------------------------------
 
1049
 
 
1050
class HttpPrivateTest(unittest.TestCase):
 
1051
 
 
1052
    def testParseCacheControl(self):
 
1053
        # Test that we can parse the Cache-Control header
 
1054
        self.assertEqual({}, httplib2._parse_cache_control({}))
 
1055
        self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
 
1056
        cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
 
1057
        self.assertEqual(cc['no-cache'], 1)
 
1058
        self.assertEqual(cc['max-age'], '7200')
 
1059
        cc = httplib2._parse_cache_control({'cache-control': ' , '})
 
1060
        self.assertEqual(cc[''], 1)
 
1061
 
 
1062
        try:
 
1063
            cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
 
1064
            self.assertTrue("max-age" in cc)
 
1065
        except:
 
1066
            self.fail("Should not throw exception")
 
1067
 
 
1068
    def testNormalizeHeaders(self):
 
1069
        # Test that we normalize headers to lowercase 
 
1070
        h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
 
1071
        self.assertTrue(h.has_key('cache-control'))
 
1072
        self.assertTrue(h.has_key('other'))
 
1073
        self.assertEqual('Stuff', h['other'])
 
1074
 
 
1075
    def testExpirationModelTransparent(self):
 
1076
        # Test that no-cache makes our request TRANSPARENT
 
1077
        response_headers = {
 
1078
            'cache-control': 'max-age=7200'
 
1079
        }
 
1080
        request_headers = {
 
1081
            'cache-control': 'no-cache'
 
1082
        }
 
1083
        self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
 
1084
 
 
1085
    def testMaxAgeNonNumeric(self):
 
1086
        # Test that no-cache makes our request TRANSPARENT
 
1087
        response_headers = {
 
1088
            'cache-control': 'max-age=fred, min-fresh=barney'
 
1089
        }
 
1090
        request_headers = {
 
1091
        }
 
1092
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1093
 
 
1094
 
 
1095
    def testExpirationModelNoCacheResponse(self):
 
1096
        # The date and expires point to an entry that should be
 
1097
        # FRESH, but the no-cache over-rides that.
 
1098
        now = time.time()
 
1099
        response_headers = {
 
1100
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 
1101
            'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
 
1102
            'cache-control': 'no-cache'
 
1103
        }
 
1104
        request_headers = {
 
1105
        }
 
1106
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1107
 
 
1108
    def testExpirationModelStaleRequestMustReval(self):
 
1109
        # must-revalidate forces STALE
 
1110
        self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
 
1111
 
 
1112
    def testExpirationModelStaleResponseMustReval(self):
 
1113
        # must-revalidate forces STALE
 
1114
        self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
 
1115
 
 
1116
    def testExpirationModelFresh(self):
 
1117
        response_headers = {
 
1118
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
 
1119
            'cache-control': 'max-age=2'
 
1120
        }
 
1121
        request_headers = {
 
1122
        }
 
1123
        self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 
1124
        time.sleep(3)
 
1125
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1126
 
 
1127
    def testExpirationMaxAge0(self):
 
1128
        response_headers = {
 
1129
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
 
1130
            'cache-control': 'max-age=0'
 
1131
        }
 
1132
        request_headers = {
 
1133
        }
 
1134
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1135
 
 
1136
    def testExpirationModelDateAndExpires(self):
 
1137
        now = time.time()
 
1138
        response_headers = {
 
1139
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 
1140
            'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
 
1141
        }
 
1142
        request_headers = {
 
1143
        }
 
1144
        self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 
1145
        time.sleep(3)
 
1146
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1147
 
 
1148
    def testExpiresZero(self):
 
1149
        now = time.time()
 
1150
        response_headers = {
 
1151
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 
1152
            'expires': "0",
 
1153
        }
 
1154
        request_headers = {
 
1155
        }
 
1156
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1157
 
 
1158
    def testExpirationModelDateOnly(self):
 
1159
        now = time.time()
 
1160
        response_headers = {
 
1161
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
 
1162
        }
 
1163
        request_headers = {
 
1164
        }
 
1165
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1166
 
 
1167
    def testExpirationModelOnlyIfCached(self):
 
1168
        response_headers = {
 
1169
        }
 
1170
        request_headers = {
 
1171
            'cache-control': 'only-if-cached',
 
1172
        }
 
1173
        self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 
1174
 
 
1175
    def testExpirationModelMaxAgeBoth(self):
 
1176
        now = time.time()
 
1177
        response_headers = {
 
1178
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 
1179
            'cache-control': 'max-age=2'
 
1180
        }
 
1181
        request_headers = {
 
1182
            'cache-control': 'max-age=0'
 
1183
        }
 
1184
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1185
 
 
1186
    def testExpirationModelDateAndExpiresMinFresh1(self):
 
1187
        now = time.time()
 
1188
        response_headers = {
 
1189
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 
1190
            'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
 
1191
        }
 
1192
        request_headers = {
 
1193
            'cache-control': 'min-fresh=2'
 
1194
        }
 
1195
        self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 
1196
 
 
1197
    def testExpirationModelDateAndExpiresMinFresh2(self):
 
1198
        now = time.time()
 
1199
        response_headers = {
 
1200
            'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 
1201
            'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
 
1202
        }
 
1203
        request_headers = {
 
1204
            'cache-control': 'min-fresh=2'
 
1205
        }
 
1206
        self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 
1207
 
 
1208
    def testParseWWWAuthenticateEmpty(self):
 
1209
        res = httplib2._parse_www_authenticate({})
 
1210
        self.assertEqual(len(res.keys()), 0) 
 
1211
 
 
1212
    def testParseWWWAuthenticate(self):
 
1213
        # different uses of spaces around commas
 
1214
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
 
1215
        self.assertEqual(len(res.keys()), 1)
 
1216
        self.assertEqual(len(res['test'].keys()), 5)
 
1217
        
 
1218
        # tokens with non-alphanum
 
1219
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
 
1220
        self.assertEqual(len(res.keys()), 1)
 
1221
        self.assertEqual(len(res['t*!%#st'].keys()), 2)
 
1222
        
 
1223
        # quoted string with quoted pairs
 
1224
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
 
1225
        self.assertEqual(len(res.keys()), 1)
 
1226
        self.assertEqual(res['test']['realm'], 'a "test" realm')
 
1227
 
 
1228
    def testParseWWWAuthenticateStrict(self):
 
1229
        httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
 
1230
        self.testParseWWWAuthenticate();
 
1231
        httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
 
1232
 
 
1233
    def testParseWWWAuthenticateBasic(self):
 
1234
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
 
1235
        basic = res['basic']
 
1236
        self.assertEqual('me', basic['realm'])
 
1237
 
 
1238
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
 
1239
        basic = res['basic']
 
1240
        self.assertEqual('me', basic['realm'])
 
1241
        self.assertEqual('MD5', basic['algorithm'])
 
1242
 
 
1243
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
 
1244
        basic = res['basic']
 
1245
        self.assertEqual('me', basic['realm'])
 
1246
        self.assertEqual('MD5', basic['algorithm'])
 
1247
 
 
1248
    def testParseWWWAuthenticateBasic2(self):
 
1249
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
 
1250
        basic = res['basic']
 
1251
        self.assertEqual('me', basic['realm'])
 
1252
        self.assertEqual('fred', basic['other'])
 
1253
 
 
1254
    def testParseWWWAuthenticateBasic3(self):
 
1255
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
 
1256
        basic = res['basic']
 
1257
        self.assertEqual('me', basic['realm'])
 
1258
 
 
1259
 
 
1260
    def testParseWWWAuthenticateDigest(self):
 
1261
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 
1262
                'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
 
1263
        digest = res['digest']
 
1264
        self.assertEqual('testrealm@host.com', digest['realm'])
 
1265
        self.assertEqual('auth,auth-int', digest['qop'])
 
1266
 
 
1267
 
 
1268
    def testParseWWWAuthenticateMultiple(self):
 
1269
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 
1270
                'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
 
1271
        digest = res['digest']
 
1272
        self.assertEqual('testrealm@host.com', digest['realm'])
 
1273
        self.assertEqual('auth,auth-int', digest['qop'])
 
1274
        self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
 
1275
        self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
 
1276
        basic = res['basic']
 
1277
        self.assertEqual('me', basic['realm'])
 
1278
 
 
1279
    def testParseWWWAuthenticateMultiple2(self):
 
1280
        # Handle an added comma between challenges, which might get thrown in if the challenges were
 
1281
        # originally sent in separate www-authenticate headers.
 
1282
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 
1283
                'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
 
1284
        digest = res['digest']
 
1285
        self.assertEqual('testrealm@host.com', digest['realm'])
 
1286
        self.assertEqual('auth,auth-int', digest['qop'])
 
1287
        self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
 
1288
        self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
 
1289
        basic = res['basic']
 
1290
        self.assertEqual('me', basic['realm'])
 
1291
 
 
1292
    def testParseWWWAuthenticateMultiple3(self):
 
1293
        # Handle an added comma between challenges, which might get thrown in if the challenges were
 
1294
        # originally sent in separate www-authenticate headers.
 
1295
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 
1296
                'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
 
1297
        digest = res['digest']
 
1298
        self.assertEqual('testrealm@host.com', digest['realm'])
 
1299
        self.assertEqual('auth,auth-int', digest['qop'])
 
1300
        self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
 
1301
        self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
 
1302
        basic = res['basic']
 
1303
        self.assertEqual('me', basic['realm'])
 
1304
        wsse = res['wsse']
 
1305
        self.assertEqual('foo', wsse['realm'])
 
1306
        self.assertEqual('UsernameToken', wsse['profile'])
 
1307
 
 
1308
    def testParseWWWAuthenticateMultiple4(self):
 
1309
        res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 
1310
                'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'}) 
 
1311
        digest = res['digest']
 
1312
        self.assertEqual('test-real.m@host.com', digest['realm'])
 
1313
        self.assertEqual('\tauth,auth-int', digest['qop'])
 
1314
        self.assertEqual('(*)&^&$%#', digest['nonce'])
 
1315
 
 
1316
    def testParseWWWAuthenticateMoreQuoteCombos(self):
 
1317
        res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
 
1318
        digest = res['digest']
 
1319
        self.assertEqual('myrealm', digest['realm'])
 
1320
 
 
1321
    def testDigestObject(self):
 
1322
        credentials = ('joe', 'password')
 
1323
        host = None
 
1324
        request_uri = '/projects/httplib2/test/digest/' 
 
1325
        headers = {}
 
1326
        response = {
 
1327
            'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
 
1328
        }
 
1329
        content = ""
 
1330
        
 
1331
        d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
 
1332
        d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 
 
1333
        our_request = "Authorization: %s" % headers['Authorization']
 
1334
        working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
 
1335
        self.assertEqual(our_request, working_request)
 
1336
 
 
1337
 
 
1338
    def testDigestObjectStale(self):
 
1339
        credentials = ('joe', 'password')
 
1340
        host = None
 
1341
        request_uri = '/projects/httplib2/test/digest/' 
 
1342
        headers = {}
 
1343
        response = httplib2.Response({ })
 
1344
        response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
 
1345
        response.status = 401
 
1346
        content = ""
 
1347
        d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
 
1348
        # Returns true to force a retry
 
1349
        self.assertTrue( d.response(response, content) )
 
1350
 
 
1351
    def testDigestObjectAuthInfo(self):
 
1352
        credentials = ('joe', 'password')
 
1353
        host = None
 
1354
        request_uri = '/projects/httplib2/test/digest/' 
 
1355
        headers = {}
 
1356
        response = httplib2.Response({ })
 
1357
        response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
 
1358
        response['authentication-info'] = 'nextnonce="fred"'
 
1359
        content = ""
 
1360
        d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
 
1361
        # Returns true to force a retry
 
1362
        self.assertFalse( d.response(response, content) )
 
1363
        self.assertEqual('fred', d.challenge['nonce'])
 
1364
        self.assertEqual(1, d.challenge['nc'])
 
1365
 
 
1366
    def testWsseAlgorithm(self):
 
1367
        digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
 
1368
        expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY="
 
1369
        self.assertEqual(expected, digest)
 
1370
 
 
1371
    def testEnd2End(self):
 
1372
        # one end to end header
 
1373
        response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
 
1374
        end2end = httplib2._get_end2end_headers(response)
 
1375
        self.assertTrue('content-type' in end2end)
 
1376
        self.assertTrue('te' not in end2end)
 
1377
        self.assertTrue('connection' not in end2end)
 
1378
 
 
1379
        # one end to end header that gets eliminated
 
1380
        response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
 
1381
        end2end = httplib2._get_end2end_headers(response)
 
1382
        self.assertTrue('content-type' not in end2end)
 
1383
        self.assertTrue('te' not in end2end)
 
1384
        self.assertTrue('connection' not in end2end)
 
1385
 
 
1386
        # Degenerate case of no headers
 
1387
        response = {}
 
1388
        end2end = httplib2._get_end2end_headers(response)
 
1389
        self.assertEquals(0, len(end2end))
 
1390
 
 
1391
        # Degenerate case of connection referrring to a header not passed in 
 
1392
        response = {'connection': 'content-type'}
 
1393
        end2end = httplib2._get_end2end_headers(response)
 
1394
        self.assertEquals(0, len(end2end))
 
1395
 
 
1396
if __name__ == '__main__':
 
1397
    unittest.main()