1
# Copyright (c) 2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.web._auth}.
9
from zope.interface import implements
10
from zope.interface.verify import verifyObject
12
from twisted.trial import unittest
14
from twisted.internet.address import IPv4Address
16
from twisted.cred import error, portal
17
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
18
from twisted.cred.checkers import ANONYMOUS, AllowAnonymousAccess
19
from twisted.cred.credentials import IUsernamePassword
21
from twisted.web.iweb import ICredentialFactory
22
from twisted.web.resource import IResource, Resource, getChildForRequest
23
from twisted.web._auth import basic, digest
24
from twisted.web._auth.wrapper import HTTPAuthSessionWrapper, UnauthorizedResource
25
from twisted.web._auth.basic import BasicCredentialFactory
27
from twisted.web.server import NOT_DONE_YET
28
from twisted.web.static import Data
30
from twisted.web.test.test_web import DummyRequest
34
return s.encode('base64').strip()
37
class BasicAuthTestsMixin:
39
L{TestCase} mixin class which defines a number of tests for
40
L{basic.BasicCredentialFactory}. Because this mixin defines C{setUp}, it
41
must be inherited before L{TestCase}.
44
self.request = self.makeRequest()
46
self.username = 'dreid'
47
self.password = 'S3CuR1Ty'
48
self.credentialFactory = basic.BasicCredentialFactory(self.realm)
51
def makeRequest(self, method='GET', clientAddress=None):
53
Create a request object to be passed to
54
L{basic.BasicCredentialFactory.decode} along with a response value.
55
Override this in a subclass.
57
raise NotImplementedError("%r did not implement makeRequest" % (
61
def test_interface(self):
63
L{BasicCredentialFactory} implements L{ICredentialFactory}.
66
verifyObject(ICredentialFactory, self.credentialFactory))
69
def test_usernamePassword(self):
71
L{basic.BasicCredentialFactory.decode} turns a base64-encoded response
72
into a L{UsernamePassword} object with a password which reflects the
73
one which was encoded in the response.
75
response = b64encode('%s:%s' % (self.username, self.password))
77
creds = self.credentialFactory.decode(response, self.request)
78
self.assertTrue(IUsernamePassword.providedBy(creds))
79
self.assertTrue(creds.checkPassword(self.password))
80
self.assertFalse(creds.checkPassword(self.password + 'wrong'))
83
def test_incorrectPadding(self):
85
L{basic.BasicCredentialFactory.decode} decodes a base64-encoded
86
response with incorrect padding.
88
response = b64encode('%s:%s' % (self.username, self.password))
89
response = response.strip('=')
91
creds = self.credentialFactory.decode(response, self.request)
92
self.assertTrue(verifyObject(IUsernamePassword, creds))
93
self.assertTrue(creds.checkPassword(self.password))
96
def test_invalidEncoding(self):
98
L{basic.BasicCredentialFactory.decode} raises L{LoginFailed} if passed
99
a response which is not base64-encoded.
101
response = 'x' # one byte cannot be valid base64 text
104
self.credentialFactory.decode, response, self.makeRequest())
107
def test_invalidCredentials(self):
109
L{basic.BasicCredentialFactory.decode} raises L{LoginFailed} when
110
passed a response which is not valid base64-encoded text.
112
response = b64encode('123abc+/')
115
self.credentialFactory.decode,
116
response, self.makeRequest())
120
def makeRequest(self, method='GET', clientAddress=None):
122
Create a L{DummyRequest} (change me to create a
123
L{twisted.web.http.Request} instead).
125
request = DummyRequest('/')
126
request.method = method
127
request.client = clientAddress
132
class BasicAuthTestCase(RequestMixin, BasicAuthTestsMixin, unittest.TestCase):
134
Basic authentication tests which use L{twisted.web.http.Request}.
139
class DigestAuthTestCase(RequestMixin, unittest.TestCase):
141
Digest authentication tests which use L{twisted.web.http.Request}.
146
Create a DigestCredentialFactory for testing
148
self.realm = "test realm"
149
self.algorithm = "md5"
150
self.credentialFactory = digest.DigestCredentialFactory(
151
self.algorithm, self.realm)
152
self.request = self.makeRequest()
155
def test_decode(self):
157
L{digest.DigestCredentialFactory.decode} calls the C{decode} method on
158
L{twisted.cred.digest.DigestCredentialFactory} with the HTTP method and
165
def check(_response, _method, _host):
166
self.assertEqual(response, _response)
167
self.assertEqual(method, _method)
168
self.assertEqual(host, _host)
171
self.patch(self.credentialFactory.digest, 'decode', check)
172
req = self.makeRequest(method, IPv4Address('TCP', host, 81))
173
self.credentialFactory.decode(response, req)
174
self.assertTrue(done[0])
177
def test_interface(self):
179
L{DigestCredentialFactory} implements L{ICredentialFactory}.
182
verifyObject(ICredentialFactory, self.credentialFactory))
185
def test_getChallenge(self):
187
The challenge issued by L{DigestCredentialFactory.getChallenge} must
188
include C{'qop'}, C{'realm'}, C{'algorithm'}, C{'nonce'}, and
189
C{'opaque'} keys. The values for the C{'realm'} and C{'algorithm'}
190
keys must match the values supplied to the factory's initializer.
191
None of the values may have newlines in them.
193
challenge = self.credentialFactory.getChallenge(self.request)
194
self.assertEquals(challenge['qop'], 'auth')
195
self.assertEquals(challenge['realm'], 'test realm')
196
self.assertEquals(challenge['algorithm'], 'md5')
197
self.assertIn('nonce', challenge)
198
self.assertIn('opaque', challenge)
199
for v in challenge.values():
200
self.assertNotIn('\n', v)
203
def test_getChallengeWithoutClientIP(self):
205
L{DigestCredentialFactory.getChallenge} can issue a challenge even if
206
the L{Request} it is passed returns C{None} from C{getClientIP}.
208
request = self.makeRequest('GET', None)
209
challenge = self.credentialFactory.getChallenge(request)
210
self.assertEqual(challenge['qop'], 'auth')
211
self.assertEqual(challenge['realm'], 'test realm')
212
self.assertEqual(challenge['algorithm'], 'md5')
213
self.assertIn('nonce', challenge)
214
self.assertIn('opaque', challenge)
218
class UnauthorizedResourceTests(unittest.TestCase):
220
Tests for L{UnauthorizedResource}.
222
def test_getChildWithDefault(self):
224
An L{UnauthorizedResource} is every child of itself.
226
resource = UnauthorizedResource([])
227
self.assertIdentical(
228
resource.getChildWithDefault("foo", None), resource)
229
self.assertIdentical(
230
resource.getChildWithDefault("bar", None), resource)
233
def test_render(self):
235
L{UnauthorizedResource} renders with a 401 response code and a
236
I{WWW-Authenticate} header and puts a simple unauthorized message
237
into the response body.
239
resource = UnauthorizedResource([
240
BasicCredentialFactory('example.com')])
241
request = DummyRequest([''])
242
request.render(resource)
243
self.assertEqual(request.responseCode, 401)
245
request.responseHeaders.getRawHeaders('www-authenticate'),
246
['basic realm="example.com"'])
247
self.assertEqual(request.written, ['Unauthorized'])
250
def test_renderQuotesRealm(self):
252
The realm value included in the I{WWW-Authenticate} header set in
253
the response when L{UnauthorizedResounrce} is rendered has quotes
254
and backslashes escaped.
256
resource = UnauthorizedResource([
257
BasicCredentialFactory('example\\"foo')])
258
request = DummyRequest([''])
259
request.render(resource)
261
request.responseHeaders.getRawHeaders('www-authenticate'),
262
['basic realm="example\\\\\\"foo"'])
268
A simple L{IRealm} implementation which gives out L{WebAvatar} for any
271
@type loggedIn: C{int}
272
@ivar loggedIn: The number of times C{requestAvatar} has been invoked for
275
@type loggedOut: C{int}
276
@ivar loggedOut: The number of times the logout callback has been invoked.
278
implements(portal.IRealm)
280
def __init__(self, avatarFactory):
283
self.avatarFactory = avatarFactory
286
def requestAvatar(self, avatarId, mind, *interfaces):
287
if IResource in interfaces:
289
return IResource, self.avatarFactory(avatarId), self.logout
290
raise NotImplementedError()
298
class HTTPAuthHeaderTests(unittest.TestCase):
300
Tests for L{HTTPAuthSessionWrapper}.
302
makeRequest = DummyRequest
306
Create a realm, portal, and L{HTTPAuthSessionWrapper} to use in the tests.
308
self.username = 'foo bar'
309
self.password = 'bar baz'
310
self.avatarContent = "contents of the avatar resource itself"
311
self.childName = "foo-child"
312
self.childContent = "contents of the foo child of the avatar"
313
self.checker = InMemoryUsernamePasswordDatabaseDontUse()
314
self.checker.addUser(self.username, self.password)
315
self.avatar = Data(self.avatarContent, 'text/plain')
316
self.avatar.putChild(
317
self.childName, Data(self.childContent, 'text/plain'))
318
self.avatars = {self.username: self.avatar}
319
self.realm = Realm(self.avatars.get)
320
self.portal = portal.Portal(self.realm, [self.checker])
321
self.credentialFactories = []
322
self.wrapper = HTTPAuthSessionWrapper(
323
self.portal, self.credentialFactories)
326
def _authorizedBasicLogin(self, request):
328
Add an I{basic authorization} header to the given request and then
329
dispatch it, starting from C{self.wrapper} and returning the resulting
332
authorization = b64encode(self.username + ':' + self.password)
333
request.headers['authorization'] = 'Basic ' + authorization
334
return getChildForRequest(self.wrapper, request)
337
def test_getChildWithDefault(self):
339
Resource traversal which encounters an L{HTTPAuthSessionWrapper}
340
results in an L{UnauthorizedResource} instance when the request does
341
not have the required I{Authorization} headers.
343
request = self.makeRequest([self.childName])
344
child = getChildForRequest(self.wrapper, request)
345
d = request.notifyFinish()
346
def cbFinished(result):
347
self.assertEquals(request.responseCode, 401)
348
d.addCallback(cbFinished)
349
request.render(child)
353
def _invalidAuthorizationTest(self, response):
355
Create a request with the given value as the value of an
356
I{Authorization} header and perform resource traversal with it,
357
starting at C{self.wrapper}. Assert that the result is a 401 response
358
code. Return a L{Deferred} which fires when this is all done.
360
self.credentialFactories.append(BasicCredentialFactory('example.com'))
361
request = self.makeRequest([self.childName])
362
request.headers['authorization'] = response
363
child = getChildForRequest(self.wrapper, request)
364
d = request.notifyFinish()
365
def cbFinished(result):
366
self.assertEqual(request.responseCode, 401)
367
d.addCallback(cbFinished)
368
request.render(child)
372
def test_getChildWithDefaultUnauthorizedUser(self):
374
Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
375
results in an L{UnauthorizedResource} when the request has an
376
I{Authorization} header with a user which does not exist.
378
return self._invalidAuthorizationTest('Basic ' + b64encode('foo:bar'))
381
def test_getChildWithDefaultUnauthorizedPassword(self):
383
Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
384
results in an L{UnauthorizedResource} when the request has an
385
I{Authorization} header with a user which exists and the wrong
388
return self._invalidAuthorizationTest(
389
'Basic ' + b64encode(self.username + ':bar'))
392
def test_getChildWithDefaultUnrecognizedScheme(self):
394
Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
395
results in an L{UnauthorizedResource} when the request has an
396
I{Authorization} header with an unrecognized scheme.
398
return self._invalidAuthorizationTest('Quux foo bar baz')
401
def test_getChildWithDefaultAuthorized(self):
403
Resource traversal which encounters an L{HTTPAuthSessionWrapper}
404
results in an L{IResource} which renders the L{IResource} avatar
405
retrieved from the portal when the request has a valid I{Authorization}
408
self.credentialFactories.append(BasicCredentialFactory('example.com'))
409
request = self.makeRequest([self.childName])
410
child = self._authorizedBasicLogin(request)
411
d = request.notifyFinish()
412
def cbFinished(ignored):
413
self.assertEquals(request.written, [self.childContent])
414
d.addCallback(cbFinished)
415
request.render(child)
419
def test_renderAuthorized(self):
421
Resource traversal which terminates at an L{HTTPAuthSessionWrapper}
422
and includes correct authentication headers results in the
423
L{IResource} avatar (not one of its children) retrieved from the
424
portal being rendered.
426
self.credentialFactories.append(BasicCredentialFactory('example.com'))
427
# Request it exactly, not any of its children.
428
request = self.makeRequest([])
429
child = self._authorizedBasicLogin(request)
430
d = request.notifyFinish()
431
def cbFinished(ignored):
432
self.assertEquals(request.written, [self.avatarContent])
433
d.addCallback(cbFinished)
434
request.render(child)
438
def test_getChallengeCalledWithRequest(self):
440
When L{HTTPAuthSessionWrapper} finds an L{ICredentialFactory} to issue
441
a challenge, it calls the C{getChallenge} method with the request as an
444
class DumbCredentialFactory(object):
445
implements(ICredentialFactory)
451
def getChallenge(self, request):
452
self.requests.append(request)
455
factory = DumbCredentialFactory()
456
self.credentialFactories.append(factory)
457
request = self.makeRequest([self.childName])
458
child = getChildForRequest(self.wrapper, request)
459
d = request.notifyFinish()
460
def cbFinished(ignored):
461
self.assertEqual(factory.requests, [request])
462
d.addCallback(cbFinished)
463
request.render(child)
467
def test_logout(self):
469
The realm's logout callback is invoked after the resource is rendered.
471
self.credentialFactories.append(BasicCredentialFactory('example.com'))
473
class SlowerResource(Resource):
474
def render(self, request):
477
self.avatar.putChild(self.childName, SlowerResource())
478
request = self.makeRequest([self.childName])
479
child = self._authorizedBasicLogin(request)
480
request.render(child)
481
self.assertEqual(self.realm.loggedOut, 0)
483
self.assertEqual(self.realm.loggedOut, 1)
486
def test_decodeRaises(self):
488
Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
489
results in an L{UnauthorizedResource} when the request has a I{Basic
490
Authorization} header which cannot be decoded using base64.
492
self.credentialFactories.append(BasicCredentialFactory('example.com'))
493
request = self.makeRequest([self.childName])
494
request.headers['authorization'] = 'Basic decode should fail'
495
child = getChildForRequest(self.wrapper, request)
496
self.assertIsInstance(child, UnauthorizedResource)
499
def test_selectParseResponse(self):
501
L{HTTPAuthSessionWrapper._selectParseHeader} returns a two-tuple giving
502
the L{ICredentialFactory} to use to parse the header and a string
503
containing the portion of the header which remains to be parsed.
505
basicAuthorization = 'Basic abcdef123456'
507
self.wrapper._selectParseHeader(basicAuthorization),
509
factory = BasicCredentialFactory('example.com')
510
self.credentialFactories.append(factory)
512
self.wrapper._selectParseHeader(basicAuthorization),
513
(factory, 'abcdef123456'))
516
def test_unexpectedDecodeError(self):
518
Any unexpected exception raised by the credential factory's C{decode}
519
method results in a 500 response code and causes the exception to be
522
class UnexpectedException(Exception):
525
class BadFactory(object):
528
def getChallenge(self, client):
531
def decode(self, response, request):
532
raise UnexpectedException()
534
self.credentialFactories.append(BadFactory())
535
request = self.makeRequest([self.childName])
536
request.headers['authorization'] = 'Bad abc'
537
child = getChildForRequest(self.wrapper, request)
538
request.render(child)
539
self.assertEqual(request.responseCode, 500)
540
self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
543
def test_unexpectedLoginError(self):
545
Any unexpected failure from L{Portal.login} results in a 500 response
546
code and causes the failure to be logged.
548
class UnexpectedException(Exception):
551
class BrokenChecker(object):
552
credentialInterfaces = (IUsernamePassword,)
554
def requestAvatarId(self, credentials):
555
raise UnexpectedException()
557
self.portal.registerChecker(BrokenChecker())
558
self.credentialFactories.append(BasicCredentialFactory('example.com'))
559
request = self.makeRequest([self.childName])
560
child = self._authorizedBasicLogin(request)
561
request.render(child)
562
self.assertEqual(request.responseCode, 500)
563
self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
566
def test_anonymousAccess(self):
568
Anonymous requests are allowed if a L{Portal} has an anonymous checker
571
unprotectedContents = "contents of the unprotected child resource"
573
self.avatars[ANONYMOUS] = Resource()
574
self.avatars[ANONYMOUS].putChild(
575
self.childName, Data(unprotectedContents, 'text/plain'))
576
self.portal.registerChecker(AllowAnonymousAccess())
578
self.credentialFactories.append(BasicCredentialFactory('example.com'))
579
request = self.makeRequest([self.childName])
580
child = getChildForRequest(self.wrapper, request)
581
d = request.notifyFinish()
582
def cbFinished(ignored):
583
self.assertEquals(request.written, [unprotectedContents])
584
d.addCallback(cbFinished)
585
request.render(child)