~divmod-dev/divmod.org/trunk

« back to all changes in this revision

Viewing changes to Epsilon/epsilon/test/test_ampauth.py

  • Committer: Jean-Paul Calderone
  • Date: 2014-06-29 20:33:04 UTC
  • mfrom: (2749.1.1 remove-epsilon-1325289)
  • Revision ID: exarkun@twistedmatrix.com-20140629203304-gdkmbwl1suei4m97
mergeĀ lp:~exarkun/divmod.org/remove-epsilon-1325289

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2008 Divmod.  See LICENSE for details.
2
 
 
3
 
"""
4
 
Tests for L{epsilon.ampauth}.
5
 
"""
6
 
 
7
 
import epsilon.hotfix
8
 
epsilon.hotfix.require('twisted', 'loopbackasync_reentrancy')
9
 
 
10
 
from hashlib import sha1
11
 
 
12
 
from zope.interface import implements
13
 
from zope.interface.verify import verifyObject
14
 
 
15
 
from twisted.python.failure import Failure
16
 
from twisted.internet.error import ConnectionDone
17
 
from twisted.cred.error import UnauthorizedLogin
18
 
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
19
 
from twisted.cred.credentials import UsernamePassword
20
 
from twisted.cred.portal import Portal
21
 
from twisted.protocols.amp import IBoxReceiver, BinaryBoxProtocol, CommandLocator, AMP
22
 
from twisted.protocols.loopback import loopbackAsync
23
 
from twisted.trial.unittest import TestCase
24
 
 
25
 
from epsilon.ampauth import (
26
 
    _AMPOneTimePad, _AMPUsernamePassword, _calcResponse, UnhandledCredentials,
27
 
    CredReceiver, PasswordLogin, OTPLogin, PasswordChallengeResponse,
28
 
    OneTimePadChecker, CredAMPServerFactory, login)
29
 
 
30
 
__metaclass__ = type
31
 
 
32
 
 
33
 
 
34
 
class StubRealm:
35
 
    def __init__(self, avatar):
36
 
        self.avatar = avatar
37
 
        self.loggedOut = 0
38
 
        self.requests = []
39
 
 
40
 
 
41
 
    def requestAvatar(self, avatarId, mind, *interfaces):
42
 
        self.requests.append((avatarId, mind, interfaces))
43
 
        return interfaces[0], self.avatar, self.logout
44
 
 
45
 
 
46
 
    def logout(self):
47
 
        self.loggedOut += 1
48
 
 
49
 
 
50
 
 
51
 
class StubAvatar:
52
 
    """
53
 
    An L{IBoxReceiver} implementation which can be used as an avatar by the
54
 
    L{CredReceiver} tests.
55
 
    """
56
 
    implements(IBoxReceiver)
57
 
 
58
 
    def startReceivingBoxes(self, sender):
59
 
        self.boxSender = sender
60
 
 
61
 
 
62
 
    def ampBoxReceived(self, box):
63
 
        pass
64
 
 
65
 
 
66
 
    def stopReceivingBoxes(self, reason):
67
 
        pass
68
 
 
69
 
verifyObject(IBoxReceiver, StubAvatar())
70
 
 
71
 
 
72
 
 
73
 
class CredReceiverTests(TestCase):
74
 
    """
75
 
    Tests for L{CredReceiver}, an L{IBoxReceiver} which integrates with
76
 
    L{twisted.cred} to provide authentication and authorization of AMP
77
 
    connections.
78
 
    """
79
 
    def setUp(self):
80
 
        """
81
 
        Create a L{CredReceiver} hooked up to a fake L{IBoxSender} which
82
 
        records boxes sent through it.
83
 
        """
84
 
        self.username = 'alice@example.com'
85
 
        self.password = 'foo bar baz'
86
 
        self.checker = InMemoryUsernamePasswordDatabaseDontUse()
87
 
        self.checker.addUser(self.username, self.password)
88
 
        self.avatar = StubAvatar()
89
 
        self.realm = StubRealm(self.avatar)
90
 
        self.portal = Portal(self.realm, [self.checker])
91
 
        self.server = CredReceiver()
92
 
        self.server.portal = self.portal
93
 
        self.client = AMP()
94
 
        self.finished = loopbackAsync(self.server, self.client)
95
 
 
96
 
 
97
 
    def test_otpLogin(self):
98
 
        """
99
 
        L{CredReceiver.otpLogin} returns without error if the pad is valid.
100
 
        """
101
 
        PAD = 'test_otpLogin'
102
 
        self.portal.registerChecker(OneTimePadChecker({PAD: 'user'}))
103
 
        d = self.server.otpLogin(PAD)
104
 
        def cbLoggedIn(result):
105
 
            self.assertEqual(result, {})
106
 
        d.addCallback(cbLoggedIn)
107
 
        return d
108
 
 
109
 
 
110
 
    def test_otpLoginUnauthorized(self):
111
 
        """
112
 
        L{CredReceiver.otpLogin} should fail with L{UnauthorizedLogin} if an
113
 
        invalid pad is received.
114
 
        """
115
 
        self.portal.registerChecker(OneTimePadChecker({}))
116
 
        return self.assertFailure(
117
 
            self.server.otpLogin('test_otpLoginUnauthorized'),
118
 
            UnauthorizedLogin)
119
 
 
120
 
 
121
 
    def test_otpLoginNotImplemented(self):
122
 
        """
123
 
        L{CredReceiver.otpLogin} should fail with L{NotImplementedError} if
124
 
        the realm raises L{NotImplementedError} when asked for the avatar.
125
 
        """
126
 
        def noAvatar(avatarId, mind, *interfaces):
127
 
            raise NotImplementedError()
128
 
        self.realm.requestAvatar = noAvatar
129
 
 
130
 
        PAD = 'test_otpLoginNotImplemented'
131
 
        self.portal.registerChecker(OneTimePadChecker({PAD: 'user'}))
132
 
        return self.assertFailure(
133
 
            self.server.otpLogin(PAD), NotImplementedError)
134
 
 
135
 
 
136
 
    def test_otpLoginResponder(self):
137
 
        """
138
 
        L{CredReceiver} responds to the L{OTPLogin} command.
139
 
        """
140
 
        PAD = 'test_otpLoginResponder'
141
 
        self.portal.registerChecker(OneTimePadChecker({PAD: 'user'}))
142
 
        d = self.client.callRemote(OTPLogin, pad=PAD)
143
 
        def cbLoggedIn(result):
144
 
            self.assertEqual(result, {})
145
 
        d.addCallback(cbLoggedIn)
146
 
        return d
147
 
 
148
 
 
149
 
    def test_passwordLoginDifferentChallenges(self):
150
 
        """
151
 
        L{CredReceiver.passwordLogin} returns a new challenge each time it is
152
 
        called.
153
 
        """
154
 
        first = self.server.passwordLogin(self.username)
155
 
        second = self.server.passwordLogin(self.username)
156
 
        self.assertNotEqual(first['challenge'], second['challenge'])
157
 
 
158
 
 
159
 
    def test_passwordLoginResponder(self):
160
 
        """
161
 
        L{CredReceiver} responds to the L{PasswordLogin} L{Command} with a
162
 
        challenge.
163
 
        """
164
 
        d = self.client.callRemote(PasswordLogin, username=self.username)
165
 
        def cbLogin(result):
166
 
            self.assertIn('challenge', result)
167
 
        d.addCallback(cbLogin)
168
 
        return d
169
 
 
170
 
 
171
 
    def test_determineFromDifferentNonces(self):
172
 
        """
173
 
        Each time L{PasswordChallengeResponse.determineFrom} is used, it
174
 
        generates a different C{cnonce} value.
175
 
        """
176
 
        first = PasswordChallengeResponse.determineFrom('a', 'b')
177
 
        second = PasswordChallengeResponse.determineFrom('a', 'b')
178
 
        self.assertNotEqual(first['cnonce'], second['cnonce'])
179
 
 
180
 
 
181
 
    def test_passwordChallengeResponse(self):
182
 
        """
183
 
        L{CredReceiver.passwordChallengeResponse} returns without error if the
184
 
        response is valid.
185
 
        """
186
 
        challenge = self.server.passwordLogin(self.username)['challenge']
187
 
        cnonce = '123abc'
188
 
        cleartext = '%s %s %s' % (challenge, cnonce, self.password)
189
 
        response = sha1(cleartext).digest()
190
 
        d = self.server.passwordChallengeResponse(cnonce, response)
191
 
        def cbLoggedIn(result):
192
 
            self.assertEqual(result, {})
193
 
        d.addCallback(cbLoggedIn)
194
 
        return d
195
 
 
196
 
 
197
 
    def test_passwordChallengeResponseResponder(self):
198
 
        """
199
 
        L{CredReceiver} responds to the L{PasswordChallengeResponse} L{Command}
200
 
        with an empty box if the response supplied is valid.
201
 
        """
202
 
        challenge = self.server.passwordLogin(self.username)['challenge']
203
 
        d = self.client.callRemote(
204
 
            PasswordChallengeResponse, **PasswordChallengeResponse.determineFrom(
205
 
                challenge, self.password))
206
 
        def cbResponded(result):
207
 
            self.assertEqual(result, {})
208
 
        d.addCallback(cbResponded)
209
 
        return d
210
 
 
211
 
 
212
 
    def test_response(self):
213
 
        """
214
 
        L{PasswordChallengeResponse.determineFrom} generates the correct
215
 
        response to a challenge issued by L{CredReceiver.passwordLogin}.
216
 
        """
217
 
        challenge = self.server.passwordLogin(self.username)['challenge']
218
 
        result = PasswordChallengeResponse.determineFrom(
219
 
            challenge, self.password)
220
 
        d = self.server.passwordChallengeResponse(**result)
221
 
        def cbLoggedIn(ignored):
222
 
            [(avatarId, mind, interfaces)] = self.realm.requests
223
 
            self.assertEqual(avatarId, self.username)
224
 
            self.assertEqual(interfaces, (IBoxReceiver,))
225
 
 
226
 
            # The avatar is now the protocol's box receiver.
227
 
            self.assertIdentical(self.server.boxReceiver, self.avatar)
228
 
 
229
 
            # And the avatar has been started up with the protocol's
230
 
            # IBoxSender.
231
 
            self.assertIdentical(self.avatar.boxSender, self.server.boxSender)
232
 
 
233
 
            # After the connection is lost, the logout function should be
234
 
            # called.
235
 
            self.assertEqual(self.realm.loggedOut, 0)
236
 
            self.server.connectionLost(
237
 
                Failure(ConnectionDone("test connection lost")))
238
 
            self.assertEqual(self.realm.loggedOut, 1)
239
 
 
240
 
        d.addCallback(cbLoggedIn)
241
 
        return d
242
 
 
243
 
 
244
 
    def test_invalidResponse(self):
245
 
        """
246
 
        L{CredReceiver.passwordChallengeResponse} returns a L{Deferred} which
247
 
        fails with L{UnauthorizedLogin} if it is passed a response which is not
248
 
        valid.
249
 
        """
250
 
        challenge = self.server.passwordLogin(self.username)['challenge']
251
 
        return self.assertFailure(
252
 
            self.server.passwordChallengeResponse(cnonce='bar', response='baz'),
253
 
            UnauthorizedLogin)
254
 
 
255
 
 
256
 
    def test_connectionLostWithoutAvatar(self):
257
 
        """
258
 
        L{CredReceiver.connectionLost} does not raise an exception if no login
259
 
        has occurred when it is called.
260
 
        """
261
 
        self.server.connectionLost(
262
 
            Failure(ConnectionDone("test connection lost")))
263
 
 
264
 
 
265
 
    def test_unrecognizedCredentialsLogin(self):
266
 
        """
267
 
        L{login} raises L{UnhandledCredentials} if passed a credentials object
268
 
        which provides no interface explicitly supported by that function,
269
 
        currently L{IUsernamePassword}.
270
 
        """
271
 
        self.assertRaises(UnhandledCredentials, login, None, None)
272
 
 
273
 
 
274
 
    def test_passwordChallengeLogin(self):
275
 
        """
276
 
        L{login} issues the commands necessary to authenticate against
277
 
        L{CredReceiver} when given an L{IUsernamePassword} provider with its
278
 
        C{username} and C{password} attributes set to valid credentials.
279
 
        """
280
 
        loginDeferred = login(
281
 
            self.client, UsernamePassword(self.username, self.password))
282
 
 
283
 
        def cbLoggedIn(clientAgain):
284
 
            self.assertIdentical(self.client, clientAgain)
285
 
            self.assertIdentical(self.server.boxReceiver, self.avatar)
286
 
        loginDeferred.addCallback(cbLoggedIn)
287
 
        return loginDeferred
288
 
 
289
 
 
290
 
    def test_passwordChallengeInvalid(self):
291
 
        """
292
 
        L{login} returns a L{Deferred} which fires with L{UnauthorizedLogin} if
293
 
        the L{UsernamePassword} credentials object given does not contain valid
294
 
        authentication information.
295
 
        """
296
 
        boxReceiver = self.server.boxReceiver
297
 
        loginDeferred = login(
298
 
            self.client, UsernamePassword(self.username + 'x', self.password))
299
 
        self.assertFailure(loginDeferred, UnauthorizedLogin)
300
 
        def cbFailed(ignored):
301
 
            self.assertIdentical(self.server.boxReceiver, boxReceiver)
302
 
        loginDeferred.addCallback(cbFailed)
303
 
        return loginDeferred
304
 
 
305
 
 
306
 
    def test_noAvatar(self):
307
 
        """
308
 
        L{login} returns a L{Deferred} which fires with L{NotImplementedError}
309
 
        if the realm raises L{NotImplementedError} when asked for the avatar.
310
 
        """
311
 
        def noAvatar(avatarId, mind, *interfaces):
312
 
            raise NotImplementedError()
313
 
        self.realm.requestAvatar = noAvatar
314
 
 
315
 
        loginDeferred = login(
316
 
            self.client, UsernamePassword(self.username, self.password))
317
 
        return self.assertFailure(loginDeferred, NotImplementedError)
318
 
 
319
 
 
320
 
 
321
 
class AMPUsernamePasswordTests(TestCase):
322
 
    """
323
 
    Tests for L{_AMPUsernamePasswordTests}, a credentials type which works with
324
 
    username/challenge/nonce/responses of the form used by L{PasswordLogin}.
325
 
    """
326
 
    def setUp(self):
327
 
        self.username = 'user name'
328
 
        password = u'foo bar\N{LATIN SMALL LETTER E WITH ACUTE}'
329
 
        self.password = password.encode('utf-8')
330
 
        self.challenge = '123xyzabc789'
331
 
        self.nonce = '1 2 3 4 5'
332
 
        self.response = _calcResponse(
333
 
            self.challenge, self.nonce, self.password)
334
 
        self.credentials = _AMPUsernamePassword(
335
 
            self.username, self.challenge, self.nonce, self.response)
336
 
 
337
 
    def test_checkPasswordString(self):
338
 
        """
339
 
        L{_AMPUsernamePassword} accepts a C{str} for the known correct
340
 
        password and returns C{True} if the response matches it.
341
 
        """
342
 
        self.assertTrue(self.credentials.checkPassword(self.password))
343
 
 
344
 
 
345
 
    def test_checkInvalidPasswordString(self):
346
 
        """
347
 
        L{_AMPUsernamePassword} accepts a C{str} for the known correct
348
 
        password and returns C{False} if the response does not match it.
349
 
        """
350
 
        self.assertFalse(self.credentials.checkPassword('quux'))
351
 
 
352
 
 
353
 
    def test_checkPasswordUnicode(self):
354
 
        """
355
 
        L{_AMPUsernamePassword} accepts a C{unicode} for the known correct
356
 
        password and returns C{True} if the response matches the UTF-8 encoding
357
 
        of it.
358
 
        """
359
 
        self.assertTrue(
360
 
            self.credentials.checkPassword(self.password.decode('utf-8')))
361
 
 
362
 
 
363
 
    def test_checkInvalidPasswordUnicode(self):
364
 
        """
365
 
        L{_AMPUsernamePassword} accepts a C{unicode} for the known correct
366
 
        password and returns C{False} if the response does not match the UTF-8
367
 
        encoding of it.
368
 
        """
369
 
        self.assertFalse(
370
 
            self.credentials.checkPassword(
371
 
                u'\N{LATIN SMALL LETTER E WITH ACUTE}'))
372
 
 
373
 
 
374
 
 
375
 
class CredAMPServerFactoryTests(TestCase):
376
 
    """
377
 
    Tests for L{CredAMPServerFactory}.
378
 
    """
379
 
    def test_buildProtocol(self):
380
 
        """
381
 
        L{CredAMPServerFactory.buildProtocol} returns a L{CredReceiver}
382
 
        instance with its C{portal} attribute set to the portal object passed
383
 
        to L{CredAMPServerFactory.__init__}.
384
 
        """
385
 
        portal = object()
386
 
        factory = CredAMPServerFactory(portal)
387
 
        proto = factory.buildProtocol(None)
388
 
        self.assertIsInstance(proto, CredReceiver)
389
 
        self.assertIdentical(proto.portal, portal)
390
 
 
391
 
 
392
 
 
393
 
class OneTimePadCheckerTests(TestCase):
394
 
    """
395
 
    Tests for L{OneTimePadChecker}.
396
 
    """
397
 
    def test_requestAvatarId(self):
398
 
        """
399
 
        L{OneTimePadChecker.requestAvatarId} should return the username in the
400
 
        case the pad is valid.
401
 
        """
402
 
        PAD = 'test_requestAvatarId'
403
 
        USERNAME = 'test_requestAvatarId username'
404
 
        checker = OneTimePadChecker({PAD: USERNAME})
405
 
        self.assertEqual(
406
 
            checker.requestAvatarId(_AMPOneTimePad(PAD)), USERNAME)
407
 
 
408
 
 
409
 
    def test_requestAvatarIdUnauthorized(self):
410
 
        """
411
 
        L{OneTimePadChecker.requestAvatarId} should throw L{UnauthorizedLogin}
412
 
        if an unknown pad is given.
413
 
        """
414
 
        checker = OneTimePadChecker({})
415
 
        self.assertRaises(
416
 
            UnauthorizedLogin,
417
 
            lambda: checker.requestAvatarId(_AMPOneTimePad(None)))
418
 
 
419
 
 
420
 
    def test_oneTimePad(self):
421
 
        """
422
 
        L{OneTimePadChecker.requestAvatarId} should invalidate the pad if a
423
 
        login is successful.
424
 
        """
425
 
        PAD = 'test_requestAvatarId'
426
 
        checker = OneTimePadChecker({PAD: 'username'})
427
 
        checker.requestAvatarId(_AMPOneTimePad(PAD))
428
 
        self.assertRaises(
429
 
            UnauthorizedLogin,
430
 
            lambda: checker.requestAvatarId(_AMPOneTimePad(PAD)))