1
# Copyright (c) 2008 Divmod. See LICENSE for details.
4
Tests for L{epsilon.ampauth}.
8
epsilon.hotfix.require('twisted', 'loopbackasync_reentrancy')
10
from hashlib import sha1
12
from zope.interface import implements
13
from zope.interface.verify import verifyObject
15
from twisted.python.failure import Failure
16
from twisted.internet.error import ConnectionDone
17
from twisted.cred.error import UnauthorizedLogin
18
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
19
from twisted.cred.credentials import UsernamePassword
20
from twisted.cred.portal import Portal
21
from twisted.protocols.amp import IBoxReceiver, BinaryBoxProtocol, CommandLocator, AMP
22
from twisted.protocols.loopback import loopbackAsync
23
from twisted.trial.unittest import TestCase
25
from epsilon.ampauth import (
26
_AMPOneTimePad, _AMPUsernamePassword, _calcResponse, UnhandledCredentials,
27
CredReceiver, PasswordLogin, OTPLogin, PasswordChallengeResponse,
28
OneTimePadChecker, CredAMPServerFactory, login)
35
def __init__(self, avatar):
41
def requestAvatar(self, avatarId, mind, *interfaces):
42
self.requests.append((avatarId, mind, interfaces))
43
return interfaces[0], self.avatar, self.logout
53
An L{IBoxReceiver} implementation which can be used as an avatar by the
54
L{CredReceiver} tests.
56
implements(IBoxReceiver)
58
def startReceivingBoxes(self, sender):
59
self.boxSender = sender
62
def ampBoxReceived(self, box):
66
def stopReceivingBoxes(self, reason):
69
verifyObject(IBoxReceiver, StubAvatar())
73
class CredReceiverTests(TestCase):
75
Tests for L{CredReceiver}, an L{IBoxReceiver} which integrates with
76
L{twisted.cred} to provide authentication and authorization of AMP
81
Create a L{CredReceiver} hooked up to a fake L{IBoxSender} which
82
records boxes sent through it.
84
self.username = 'alice@example.com'
85
self.password = 'foo bar baz'
86
self.checker = InMemoryUsernamePasswordDatabaseDontUse()
87
self.checker.addUser(self.username, self.password)
88
self.avatar = StubAvatar()
89
self.realm = StubRealm(self.avatar)
90
self.portal = Portal(self.realm, [self.checker])
91
self.server = CredReceiver()
92
self.server.portal = self.portal
94
self.finished = loopbackAsync(self.server, self.client)
97
def test_otpLogin(self):
99
L{CredReceiver.otpLogin} returns without error if the pad is valid.
101
PAD = 'test_otpLogin'
102
self.portal.registerChecker(OneTimePadChecker({PAD: 'user'}))
103
d = self.server.otpLogin(PAD)
104
def cbLoggedIn(result):
105
self.assertEqual(result, {})
106
d.addCallback(cbLoggedIn)
110
def test_otpLoginUnauthorized(self):
112
L{CredReceiver.otpLogin} should fail with L{UnauthorizedLogin} if an
113
invalid pad is received.
115
self.portal.registerChecker(OneTimePadChecker({}))
116
return self.assertFailure(
117
self.server.otpLogin('test_otpLoginUnauthorized'),
121
def test_otpLoginNotImplemented(self):
123
L{CredReceiver.otpLogin} should fail with L{NotImplementedError} if
124
the realm raises L{NotImplementedError} when asked for the avatar.
126
def noAvatar(avatarId, mind, *interfaces):
127
raise NotImplementedError()
128
self.realm.requestAvatar = noAvatar
130
PAD = 'test_otpLoginNotImplemented'
131
self.portal.registerChecker(OneTimePadChecker({PAD: 'user'}))
132
return self.assertFailure(
133
self.server.otpLogin(PAD), NotImplementedError)
136
def test_otpLoginResponder(self):
138
L{CredReceiver} responds to the L{OTPLogin} command.
140
PAD = 'test_otpLoginResponder'
141
self.portal.registerChecker(OneTimePadChecker({PAD: 'user'}))
142
d = self.client.callRemote(OTPLogin, pad=PAD)
143
def cbLoggedIn(result):
144
self.assertEqual(result, {})
145
d.addCallback(cbLoggedIn)
149
def test_passwordLoginDifferentChallenges(self):
151
L{CredReceiver.passwordLogin} returns a new challenge each time it is
154
first = self.server.passwordLogin(self.username)
155
second = self.server.passwordLogin(self.username)
156
self.assertNotEqual(first['challenge'], second['challenge'])
159
def test_passwordLoginResponder(self):
161
L{CredReceiver} responds to the L{PasswordLogin} L{Command} with a
164
d = self.client.callRemote(PasswordLogin, username=self.username)
166
self.assertIn('challenge', result)
167
d.addCallback(cbLogin)
171
def test_determineFromDifferentNonces(self):
173
Each time L{PasswordChallengeResponse.determineFrom} is used, it
174
generates a different C{cnonce} value.
176
first = PasswordChallengeResponse.determineFrom('a', 'b')
177
second = PasswordChallengeResponse.determineFrom('a', 'b')
178
self.assertNotEqual(first['cnonce'], second['cnonce'])
181
def test_passwordChallengeResponse(self):
183
L{CredReceiver.passwordChallengeResponse} returns without error if the
186
challenge = self.server.passwordLogin(self.username)['challenge']
188
cleartext = '%s %s %s' % (challenge, cnonce, self.password)
189
response = sha1(cleartext).digest()
190
d = self.server.passwordChallengeResponse(cnonce, response)
191
def cbLoggedIn(result):
192
self.assertEqual(result, {})
193
d.addCallback(cbLoggedIn)
197
def test_passwordChallengeResponseResponder(self):
199
L{CredReceiver} responds to the L{PasswordChallengeResponse} L{Command}
200
with an empty box if the response supplied is valid.
202
challenge = self.server.passwordLogin(self.username)['challenge']
203
d = self.client.callRemote(
204
PasswordChallengeResponse, **PasswordChallengeResponse.determineFrom(
205
challenge, self.password))
206
def cbResponded(result):
207
self.assertEqual(result, {})
208
d.addCallback(cbResponded)
212
def test_response(self):
214
L{PasswordChallengeResponse.determineFrom} generates the correct
215
response to a challenge issued by L{CredReceiver.passwordLogin}.
217
challenge = self.server.passwordLogin(self.username)['challenge']
218
result = PasswordChallengeResponse.determineFrom(
219
challenge, self.password)
220
d = self.server.passwordChallengeResponse(**result)
221
def cbLoggedIn(ignored):
222
[(avatarId, mind, interfaces)] = self.realm.requests
223
self.assertEqual(avatarId, self.username)
224
self.assertEqual(interfaces, (IBoxReceiver,))
226
# The avatar is now the protocol's box receiver.
227
self.assertIdentical(self.server.boxReceiver, self.avatar)
229
# And the avatar has been started up with the protocol's
231
self.assertIdentical(self.avatar.boxSender, self.server.boxSender)
233
# After the connection is lost, the logout function should be
235
self.assertEqual(self.realm.loggedOut, 0)
236
self.server.connectionLost(
237
Failure(ConnectionDone("test connection lost")))
238
self.assertEqual(self.realm.loggedOut, 1)
240
d.addCallback(cbLoggedIn)
244
def test_invalidResponse(self):
246
L{CredReceiver.passwordChallengeResponse} returns a L{Deferred} which
247
fails with L{UnauthorizedLogin} if it is passed a response which is not
250
challenge = self.server.passwordLogin(self.username)['challenge']
251
return self.assertFailure(
252
self.server.passwordChallengeResponse(cnonce='bar', response='baz'),
256
def test_connectionLostWithoutAvatar(self):
258
L{CredReceiver.connectionLost} does not raise an exception if no login
259
has occurred when it is called.
261
self.server.connectionLost(
262
Failure(ConnectionDone("test connection lost")))
265
def test_unrecognizedCredentialsLogin(self):
267
L{login} raises L{UnhandledCredentials} if passed a credentials object
268
which provides no interface explicitly supported by that function,
269
currently L{IUsernamePassword}.
271
self.assertRaises(UnhandledCredentials, login, None, None)
274
def test_passwordChallengeLogin(self):
276
L{login} issues the commands necessary to authenticate against
277
L{CredReceiver} when given an L{IUsernamePassword} provider with its
278
C{username} and C{password} attributes set to valid credentials.
280
loginDeferred = login(
281
self.client, UsernamePassword(self.username, self.password))
283
def cbLoggedIn(clientAgain):
284
self.assertIdentical(self.client, clientAgain)
285
self.assertIdentical(self.server.boxReceiver, self.avatar)
286
loginDeferred.addCallback(cbLoggedIn)
290
def test_passwordChallengeInvalid(self):
292
L{login} returns a L{Deferred} which fires with L{UnauthorizedLogin} if
293
the L{UsernamePassword} credentials object given does not contain valid
294
authentication information.
296
boxReceiver = self.server.boxReceiver
297
loginDeferred = login(
298
self.client, UsernamePassword(self.username + 'x', self.password))
299
self.assertFailure(loginDeferred, UnauthorizedLogin)
300
def cbFailed(ignored):
301
self.assertIdentical(self.server.boxReceiver, boxReceiver)
302
loginDeferred.addCallback(cbFailed)
306
def test_noAvatar(self):
308
L{login} returns a L{Deferred} which fires with L{NotImplementedError}
309
if the realm raises L{NotImplementedError} when asked for the avatar.
311
def noAvatar(avatarId, mind, *interfaces):
312
raise NotImplementedError()
313
self.realm.requestAvatar = noAvatar
315
loginDeferred = login(
316
self.client, UsernamePassword(self.username, self.password))
317
return self.assertFailure(loginDeferred, NotImplementedError)
321
class AMPUsernamePasswordTests(TestCase):
323
Tests for L{_AMPUsernamePasswordTests}, a credentials type which works with
324
username/challenge/nonce/responses of the form used by L{PasswordLogin}.
327
self.username = 'user name'
328
password = u'foo bar\N{LATIN SMALL LETTER E WITH ACUTE}'
329
self.password = password.encode('utf-8')
330
self.challenge = '123xyzabc789'
331
self.nonce = '1 2 3 4 5'
332
self.response = _calcResponse(
333
self.challenge, self.nonce, self.password)
334
self.credentials = _AMPUsernamePassword(
335
self.username, self.challenge, self.nonce, self.response)
337
def test_checkPasswordString(self):
339
L{_AMPUsernamePassword} accepts a C{str} for the known correct
340
password and returns C{True} if the response matches it.
342
self.assertTrue(self.credentials.checkPassword(self.password))
345
def test_checkInvalidPasswordString(self):
347
L{_AMPUsernamePassword} accepts a C{str} for the known correct
348
password and returns C{False} if the response does not match it.
350
self.assertFalse(self.credentials.checkPassword('quux'))
353
def test_checkPasswordUnicode(self):
355
L{_AMPUsernamePassword} accepts a C{unicode} for the known correct
356
password and returns C{True} if the response matches the UTF-8 encoding
360
self.credentials.checkPassword(self.password.decode('utf-8')))
363
def test_checkInvalidPasswordUnicode(self):
365
L{_AMPUsernamePassword} accepts a C{unicode} for the known correct
366
password and returns C{False} if the response does not match the UTF-8
370
self.credentials.checkPassword(
371
u'\N{LATIN SMALL LETTER E WITH ACUTE}'))
375
class CredAMPServerFactoryTests(TestCase):
377
Tests for L{CredAMPServerFactory}.
379
def test_buildProtocol(self):
381
L{CredAMPServerFactory.buildProtocol} returns a L{CredReceiver}
382
instance with its C{portal} attribute set to the portal object passed
383
to L{CredAMPServerFactory.__init__}.
386
factory = CredAMPServerFactory(portal)
387
proto = factory.buildProtocol(None)
388
self.assertIsInstance(proto, CredReceiver)
389
self.assertIdentical(proto.portal, portal)
393
class OneTimePadCheckerTests(TestCase):
395
Tests for L{OneTimePadChecker}.
397
def test_requestAvatarId(self):
399
L{OneTimePadChecker.requestAvatarId} should return the username in the
400
case the pad is valid.
402
PAD = 'test_requestAvatarId'
403
USERNAME = 'test_requestAvatarId username'
404
checker = OneTimePadChecker({PAD: USERNAME})
406
checker.requestAvatarId(_AMPOneTimePad(PAD)), USERNAME)
409
def test_requestAvatarIdUnauthorized(self):
411
L{OneTimePadChecker.requestAvatarId} should throw L{UnauthorizedLogin}
412
if an unknown pad is given.
414
checker = OneTimePadChecker({})
417
lambda: checker.requestAvatarId(_AMPOneTimePad(None)))
420
def test_oneTimePad(self):
422
L{OneTimePadChecker.requestAvatarId} should invalidate the pad if a
425
PAD = 'test_requestAvatarId'
426
checker = OneTimePadChecker({PAD: 'username'})
427
checker.requestAvatarId(_AMPOneTimePad(PAD))
430
lambda: checker.requestAvatarId(_AMPOneTimePad(PAD)))