~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/test/test_newcred.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
"""
5
 
Now with 30% more starch.
6
 
"""
7
 
 
8
 
from __future__ import generators
9
 
 
10
 
import hmac
11
 
from zope.interface import implements, Interface
12
 
 
13
 
from twisted.trial import unittest
14
 
from twisted.cred import portal, checkers, credentials, error
15
 
from twisted.python import components
16
 
from twisted.python import util
17
 
from twisted.internet import defer
18
 
from twisted.internet.defer import deferredGenerator as dG, waitForDeferred as wFD
19
 
 
20
 
try:
21
 
    from crypt import crypt
22
 
except ImportError:
23
 
    crypt = None
24
 
 
25
 
try:
26
 
    from twisted.cred.pamauth import callIntoPAM
27
 
except ImportError:
28
 
    pamauth = None
29
 
else:
30
 
    from twisted.cred import pamauth
31
 
 
32
 
class ITestable(Interface):
33
 
    pass
34
 
 
35
 
class TestAvatar:
36
 
    def __init__(self, name):
37
 
        self.name = name
38
 
        self.loggedIn = False
39
 
        self.loggedOut = False
40
 
 
41
 
    def login(self):
42
 
        assert not self.loggedIn
43
 
        self.loggedIn = True
44
 
 
45
 
    def logout(self):
46
 
        self.loggedOut = True
47
 
 
48
 
class Testable(components.Adapter):
49
 
    implements(ITestable)
50
 
 
51
 
# components.Interface(TestAvatar).adaptWith(Testable, ITestable)
52
 
 
53
 
components.registerAdapter(Testable, TestAvatar, ITestable)
54
 
 
55
 
class TestRealm:
56
 
    implements(portal.IRealm)
57
 
    def __init__(self):
58
 
        self.avatars = {}
59
 
 
60
 
    def requestAvatar(self, avatarId, mind, *interfaces):
61
 
        if self.avatars.has_key(avatarId):
62
 
            avatar = self.avatars[avatarId]
63
 
        else:
64
 
            avatar = TestAvatar(avatarId)
65
 
            self.avatars[avatarId] = avatar
66
 
        avatar.login()
67
 
        return (interfaces[0], interfaces[0](avatar),
68
 
                avatar.logout)
69
 
 
70
 
class NewCredTest(unittest.TestCase):
71
 
    def setUp(self):
72
 
        r = self.realm = TestRealm()
73
 
        p = self.portal = portal.Portal(r)
74
 
        up = self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
75
 
        up.addUser("bob", "hello")
76
 
        p.registerChecker(up)
77
 
 
78
 
    def testListCheckers(self):
79
 
        expected = [credentials.IUsernamePassword, credentials.IUsernameHashedPassword]
80
 
        got = self.portal.listCredentialsInterfaces()
81
 
        expected.sort()
82
 
        got.sort()
83
 
        self.assertEquals(got, expected)
84
 
 
85
 
    def testBasicLogin(self):
86
 
        l = []; f = []
87
 
        self.portal.login(credentials.UsernamePassword("bob", "hello"),
88
 
                          self, ITestable).addCallback(
89
 
            l.append).addErrback(f.append)
90
 
        if f:
91
 
            raise f[0]
92
 
        # print l[0].getBriefTraceback()
93
 
        iface, impl, logout = l[0]
94
 
        # whitebox
95
 
        self.assertEquals(iface, ITestable)
96
 
        self.failUnless(iface.providedBy(impl),
97
 
                        "%s does not implement %s" % (impl, iface))
98
 
        # greybox
99
 
        self.failUnless(impl.original.loggedIn)
100
 
        self.failUnless(not impl.original.loggedOut)
101
 
        logout()
102
 
        self.failUnless(impl.original.loggedOut)
103
 
 
104
 
    def testFailedLogin(self):
105
 
        l = []
106
 
        self.portal.login(credentials.UsernamePassword("bob", "h3llo"),
107
 
                          self, ITestable).addErrback(
108
 
            lambda x: x.trap(error.UnauthorizedLogin)).addCallback(l.append)
109
 
        self.failUnless(l)
110
 
        self.failUnlessEqual(error.UnauthorizedLogin, l[0])
111
 
 
112
 
    def testFailedLoginName(self):
113
 
        l = []
114
 
        self.portal.login(credentials.UsernamePassword("jay", "hello"),
115
 
                          self, ITestable).addErrback(
116
 
            lambda x: x.trap(error.UnauthorizedLogin)).addCallback(l.append)
117
 
        self.failUnless(l)
118
 
        self.failUnlessEqual(error.UnauthorizedLogin, l[0])
119
 
 
120
 
 
121
 
class CramMD5CredentialsTestCase(unittest.TestCase):
122
 
    def testIdempotentChallenge(self):
123
 
        c = credentials.CramMD5Credentials()
124
 
        chal = c.getChallenge()
125
 
        self.assertEquals(chal, c.getChallenge())
126
 
 
127
 
    def testCheckPassword(self):
128
 
        c = credentials.CramMD5Credentials()
129
 
        chal = c.getChallenge()
130
 
        c.response = hmac.HMAC('secret', chal).hexdigest()
131
 
        self.failUnless(c.checkPassword('secret'))
132
 
 
133
 
    def testWrongPassword(self):
134
 
        c = credentials.CramMD5Credentials()
135
 
        self.failIf(c.checkPassword('secret'))
136
 
 
137
 
class OnDiskDatabaseTestCase(unittest.TestCase):
138
 
    users = [
139
 
        ('user1', 'pass1'),
140
 
        ('user2', 'pass2'),
141
 
        ('user3', 'pass3'),
142
 
    ]
143
 
 
144
 
 
145
 
    def testUserLookup(self):
146
 
        dbfile = self.mktemp()
147
 
        db = checkers.FilePasswordDB(dbfile)
148
 
        f = file(dbfile, 'w')
149
 
        for (u, p) in self.users:
150
 
            f.write('%s:%s\n' % (u, p))
151
 
        f.close()
152
 
 
153
 
        for (u, p) in self.users:
154
 
            self.failUnlessRaises(KeyError, db.getUser, u.upper())
155
 
            self.assertEquals(db.getUser(u), (u, p))
156
 
 
157
 
    def testCaseInSensitivity(self):
158
 
        dbfile = self.mktemp()
159
 
        db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
160
 
        f = file(dbfile, 'w')
161
 
        for (u, p) in self.users:
162
 
            f.write('%s:%s\n' % (u, p))
163
 
        f.close()
164
 
 
165
 
        for (u, p) in self.users:
166
 
            self.assertEquals(db.getUser(u.upper()), (u, p))
167
 
 
168
 
    def testRequestAvatarId(self):
169
 
        dbfile = self.mktemp()
170
 
        db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
171
 
        f = file(dbfile, 'w')
172
 
        for (u, p) in self.users:
173
 
            f.write('%s:%s\n' % (u, p))
174
 
        f.close()
175
 
        creds = [credentials.UsernamePassword(u, p) for u, p in self.users]
176
 
        d = defer.gatherResults(
177
 
            [defer.maybeDeferred(db.requestAvatarId, c) for c in creds])
178
 
        d.addCallback(self.assertEquals, [u for u, p in self.users])
179
 
        return d
180
 
    
181
 
    def testRequestAvatarId_hashed(self):
182
 
        dbfile = self.mktemp()
183
 
        db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
184
 
        f = file(dbfile, 'w')
185
 
        for (u, p) in self.users:
186
 
            f.write('%s:%s\n' % (u, p))
187
 
        f.close()
188
 
        creds = [credentials.UsernameHashedPassword(u, p) for u, p in self.users]
189
 
        d = defer.gatherResults(
190
 
            [defer.maybeDeferred(db.requestAvatarId, c) for c in creds])
191
 
        d.addCallback(self.assertEquals, [u for u, p in self.users])
192
 
        return d
193
 
 
194
 
 
195
 
 
196
 
class HashedPasswordOnDiskDatabaseTestCase(unittest.TestCase):
197
 
    users = [
198
 
        ('user1', 'pass1'),
199
 
        ('user2', 'pass2'),
200
 
        ('user3', 'pass3'),
201
 
    ]
202
 
 
203
 
 
204
 
    def hash(self, u, p, s):
205
 
        return crypt(p, s)
206
 
 
207
 
    def setUp(self):
208
 
        dbfile = self.mktemp()
209
 
        self.db = checkers.FilePasswordDB(dbfile, hash=self.hash)
210
 
        f = file(dbfile, 'w')
211
 
        for (u, p) in self.users:
212
 
            f.write('%s:%s\n' % (u, crypt(p, u[:2])))
213
 
        f.close()
214
 
        r = TestRealm()
215
 
        self.port = portal.Portal(r)
216
 
        self.port.registerChecker(self.db)
217
 
 
218
 
    def testGoodCredentials(self):
219
 
        goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
220
 
        d = defer.gatherResults([self.db.requestAvatarId(c) for c in goodCreds])
221
 
        d.addCallback(self.assertEquals, [u for u, p in self.users])
222
 
        return d
223
 
 
224
 
    def testGoodCredentials_login(self):
225
 
        goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
226
 
        d = defer.gatherResults([self.port.login(c, None, ITestable)
227
 
                                 for c in goodCreds])
228
 
        d.addCallback(lambda x: [a.original.name for i, a, l in x])
229
 
        d.addCallback(self.assertEquals, [u for u, p in self.users])
230
 
        return d
231
 
 
232
 
    def testBadCredentials(self):
233
 
        badCreds = [credentials.UsernamePassword(u, 'wrong password')
234
 
                    for u, p in self.users]
235
 
        d = defer.DeferredList([self.port.login(c, None, ITestable)
236
 
                                for c in badCreds], consumeErrors=True)
237
 
        d.addCallback(self._assertFailures, error.UnauthorizedLogin)
238
 
        return d
239
 
    
240
 
    def testHashedCredentials(self):
241
 
        hashedCreds = [credentials.UsernameHashedPassword(u, crypt(p, u[:2]))
242
 
                       for u, p in self.users]
243
 
        d = defer.DeferredList([self.port.login(c, None, ITestable)
244
 
                                for c in hashedCreds], consumeErrors=True)
245
 
        d.addCallback(self._assertFailures, error.UnhandledCredentials)
246
 
        return d
247
 
 
248
 
    def _assertFailures(self, failures, *expectedFailures):
249
 
        for flag, failure in failures:
250
 
            self.failUnlessEqual(flag, defer.FAILURE)
251
 
            failure.trap(*expectedFailures)
252
 
        return None
253
 
 
254
 
    if crypt is None:
255
 
        skip = "crypt module not available"
256
 
 
257
 
class PluggableAuthenticationModulesTest(unittest.TestCase):
258
 
    
259
 
    def setUpClass(self):
260
 
        self._oldCallIntoPAM = pamauth.callIntoPAM
261
 
        pamauth.callIntoPAM = self.callIntoPAM
262
 
 
263
 
    def tearDownClass(self):
264
 
        pamauth.callIntoPAM = self._oldCallIntoPAM
265
 
 
266
 
    def callIntoPAM(self, service, user, conv):
267
 
        if service != 'Twisted':
268
 
            raise error.UnauthorizedLogin('bad service: %s' % service)
269
 
        if user != 'testuser':
270
 
            raise error.UnauthorizedLogin('bad username: %s' % user)
271
 
        questions = [
272
 
                (1, "Password"),
273
 
                (2, "Message w/ Input"),
274
 
                (3, "Message w/o Input"),
275
 
                ]
276
 
        replies = conv(questions)
277
 
        if replies != [
278
 
            ("password", 0),
279
 
            ("entry", 0),
280
 
            ("", 0)
281
 
            ]:
282
 
                raise error.UnauthorizedLogin('bad conversion: %s' % repr(replies))
283
 
        return 1
284
 
 
285
 
    def _makeConv(self, d):
286
 
        def conv(questions):
287
 
            return defer.succeed([(d[t], 0) for t, q in questions])
288
 
        return conv
289
 
 
290
 
    def testRequestAvatarId(self):
291
 
        db = checkers.PluggableAuthenticationModulesChecker()
292
 
        conv = self._makeConv({1:'password', 2:'entry', 3:''})
293
 
        creds = credentials.PluggableAuthenticationModules('testuser',
294
 
                conv)
295
 
        d = db.requestAvatarId(creds)
296
 
        d.addCallback(self.assertEquals, 'testuser')
297
 
        return d
298
 
 
299
 
    def testBadCredentials(self):
300
 
        db = checkers.PluggableAuthenticationModulesChecker()
301
 
        conv = self._makeConv({1:'', 2:'', 3:''})
302
 
        creds = credentials.PluggableAuthenticationModules('testuser',
303
 
                conv)
304
 
        d = db.requestAvatarId(creds)
305
 
        self.assertFailure(d, error.UnauthorizedLogin)
306
 
        return d
307
 
 
308
 
    def testBadUsername(self):
309
 
        db = checkers.PluggableAuthenticationModulesChecker()
310
 
        conv = self._makeConv({1:'password', 2:'entry', 3:''})
311
 
        creds = credentials.PluggableAuthenticationModules('baduser',
312
 
                conv)
313
 
        d = db.requestAvatarId(creds)
314
 
        self.assertFailure(d, error.UnauthorizedLogin)
315
 
        return d
316
 
 
317
 
    if not pamauth:
318
 
        skip = "Can't run without PyPAM"
319
 
 
320
 
class CheckersMixin:
321
 
    def testPositive(self):
322
 
        for chk in self.getCheckers():
323
 
            for (cred, avatarId) in self.getGoodCredentials():
324
 
                r = wFD(chk.requestAvatarId(cred))
325
 
                yield r
326
 
                self.assertEquals(r.getResult(), avatarId)
327
 
    testPositive = dG(testPositive)
328
 
 
329
 
    def testNegative(self):
330
 
        for chk in self.getCheckers():
331
 
            for cred in self.getBadCredentials():
332
 
                r = wFD(chk.requestAvatarId(cred))
333
 
                yield r
334
 
                self.assertRaises(error.UnauthorizedLogin, r.getResult)
335
 
    testNegative = dG(testNegative)
336
 
 
337
 
class HashlessFilePasswordDBMixin:
338
 
    credClass = credentials.UsernamePassword
339
 
    diskHash = None
340
 
    networkHash = staticmethod(lambda x: x)
341
 
 
342
 
    _validCredentials = [
343
 
        ('user1', 'password1'),
344
 
        ('user2', 'password2'),
345
 
        ('user3', 'password3')]
346
 
 
347
 
    def getGoodCredentials(self):
348
 
        for u, p in self._validCredentials:
349
 
            yield self.credClass(u, self.networkHash(p)), u
350
 
 
351
 
    def getBadCredentials(self):
352
 
        for u, p in [('user1', 'password3'),
353
 
                     ('user2', 'password1'),
354
 
                     ('bloof', 'blarf')]:
355
 
            yield self.credClass(u, self.networkHash(p))
356
 
 
357
 
    def getCheckers(self):
358
 
        diskHash = self.diskHash or (lambda x: x)
359
 
        hashCheck = self.diskHash and (lambda username, password, stored: self.diskHash(password))
360
 
 
361
 
        for cache in True, False:
362
 
            fn = self.mktemp()
363
 
            fObj = file(fn, 'w')
364
 
            for u, p in self._validCredentials:
365
 
                fObj.write('%s:%s\n' % (u, diskHash(p)))
366
 
            fObj.close()
367
 
            yield checkers.FilePasswordDB(fn, cache=cache, hash=hashCheck)
368
 
 
369
 
            fn = self.mktemp()
370
 
            fObj = file(fn, 'w')
371
 
            for u, p in self._validCredentials:
372
 
                fObj.write('%s dingle dongle %s\n' % (diskHash(p), u))
373
 
            fObj.close()
374
 
            yield checkers.FilePasswordDB(fn, ' ', 3, 0, cache=cache, hash=hashCheck)
375
 
 
376
 
            fn = self.mktemp()
377
 
            fObj = file(fn, 'w')
378
 
            for u, p in self._validCredentials:
379
 
                fObj.write('zip,zap,%s,zup,%s\n' % (u.title(), diskHash(p)))
380
 
            fObj.close()
381
 
            yield checkers.FilePasswordDB(fn, ',', 2, 4, False, cache=cache, hash=hashCheck)
382
 
 
383
 
class LocallyHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
384
 
    diskHash = staticmethod(lambda x: x.encode('hex'))
385
 
 
386
 
class NetworkHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
387
 
    networkHash = staticmethod(lambda x: x.encode('hex'))
388
 
    class credClass(credentials.UsernameHashedPassword):
389
 
        def checkPassword(self, password):
390
 
            return self.hashed.decode('hex') == password
391
 
 
392
 
class HashlessFilePasswordDBCheckerTestCase(HashlessFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
393
 
    pass
394
 
 
395
 
class LocallyHashedFilePasswordDBCheckerTestCase(LocallyHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
396
 
    pass
397
 
 
398
 
class NetworkHashedFilePasswordDBCheckerTestCase(NetworkHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
399
 
    pass