1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Now with 30% more starch.
8
from __future__ import generators
11
from zope.interface import implements, Interface
13
from twisted.trial import unittest
14
from twisted.cred import portal, checkers, credentials, error
15
from twisted.python import components
16
from twisted.python import util
17
from twisted.internet import defer
18
from twisted.internet.defer import deferredGenerator as dG, waitForDeferred as wFD
21
from crypt import crypt
26
from twisted.cred.pamauth import callIntoPAM
30
from twisted.cred import pamauth
32
class ITestable(Interface):
36
def __init__(self, name):
39
self.loggedOut = False
42
assert not self.loggedIn
48
class Testable(components.Adapter):
51
# components.Interface(TestAvatar).adaptWith(Testable, ITestable)
53
components.registerAdapter(Testable, TestAvatar, ITestable)
56
implements(portal.IRealm)
60
def requestAvatar(self, avatarId, mind, *interfaces):
61
if self.avatars.has_key(avatarId):
62
avatar = self.avatars[avatarId]
64
avatar = TestAvatar(avatarId)
65
self.avatars[avatarId] = avatar
67
return (interfaces[0], interfaces[0](avatar),
70
class NewCredTest(unittest.TestCase):
72
r = self.realm = TestRealm()
73
p = self.portal = portal.Portal(r)
74
up = self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
75
up.addUser("bob", "hello")
78
def testListCheckers(self):
79
expected = [credentials.IUsernamePassword, credentials.IUsernameHashedPassword]
80
got = self.portal.listCredentialsInterfaces()
83
self.assertEquals(got, expected)
85
def testBasicLogin(self):
87
self.portal.login(credentials.UsernamePassword("bob", "hello"),
88
self, ITestable).addCallback(
89
l.append).addErrback(f.append)
92
# print l[0].getBriefTraceback()
93
iface, impl, logout = l[0]
95
self.assertEquals(iface, ITestable)
96
self.failUnless(iface.providedBy(impl),
97
"%s does not implement %s" % (impl, iface))
99
self.failUnless(impl.original.loggedIn)
100
self.failUnless(not impl.original.loggedOut)
102
self.failUnless(impl.original.loggedOut)
104
def testFailedLogin(self):
106
self.portal.login(credentials.UsernamePassword("bob", "h3llo"),
107
self, ITestable).addErrback(
108
lambda x: x.trap(error.UnauthorizedLogin)).addCallback(l.append)
110
self.failUnlessEqual(error.UnauthorizedLogin, l[0])
112
def testFailedLoginName(self):
114
self.portal.login(credentials.UsernamePassword("jay", "hello"),
115
self, ITestable).addErrback(
116
lambda x: x.trap(error.UnauthorizedLogin)).addCallback(l.append)
118
self.failUnlessEqual(error.UnauthorizedLogin, l[0])
121
class CramMD5CredentialsTestCase(unittest.TestCase):
122
def testIdempotentChallenge(self):
123
c = credentials.CramMD5Credentials()
124
chal = c.getChallenge()
125
self.assertEquals(chal, c.getChallenge())
127
def testCheckPassword(self):
128
c = credentials.CramMD5Credentials()
129
chal = c.getChallenge()
130
c.response = hmac.HMAC('secret', chal).hexdigest()
131
self.failUnless(c.checkPassword('secret'))
133
def testWrongPassword(self):
134
c = credentials.CramMD5Credentials()
135
self.failIf(c.checkPassword('secret'))
137
class OnDiskDatabaseTestCase(unittest.TestCase):
145
def testUserLookup(self):
146
dbfile = self.mktemp()
147
db = checkers.FilePasswordDB(dbfile)
148
f = file(dbfile, 'w')
149
for (u, p) in self.users:
150
f.write('%s:%s\n' % (u, p))
153
for (u, p) in self.users:
154
self.failUnlessRaises(KeyError, db.getUser, u.upper())
155
self.assertEquals(db.getUser(u), (u, p))
157
def testCaseInSensitivity(self):
158
dbfile = self.mktemp()
159
db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
160
f = file(dbfile, 'w')
161
for (u, p) in self.users:
162
f.write('%s:%s\n' % (u, p))
165
for (u, p) in self.users:
166
self.assertEquals(db.getUser(u.upper()), (u, p))
168
def testRequestAvatarId(self):
169
dbfile = self.mktemp()
170
db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
171
f = file(dbfile, 'w')
172
for (u, p) in self.users:
173
f.write('%s:%s\n' % (u, p))
175
creds = [credentials.UsernamePassword(u, p) for u, p in self.users]
176
d = defer.gatherResults(
177
[defer.maybeDeferred(db.requestAvatarId, c) for c in creds])
178
d.addCallback(self.assertEquals, [u for u, p in self.users])
181
def testRequestAvatarId_hashed(self):
182
dbfile = self.mktemp()
183
db = checkers.FilePasswordDB(dbfile, caseSensitive=0)
184
f = file(dbfile, 'w')
185
for (u, p) in self.users:
186
f.write('%s:%s\n' % (u, p))
188
creds = [credentials.UsernameHashedPassword(u, p) for u, p in self.users]
189
d = defer.gatherResults(
190
[defer.maybeDeferred(db.requestAvatarId, c) for c in creds])
191
d.addCallback(self.assertEquals, [u for u, p in self.users])
196
class HashedPasswordOnDiskDatabaseTestCase(unittest.TestCase):
204
def hash(self, u, p, s):
208
dbfile = self.mktemp()
209
self.db = checkers.FilePasswordDB(dbfile, hash=self.hash)
210
f = file(dbfile, 'w')
211
for (u, p) in self.users:
212
f.write('%s:%s\n' % (u, crypt(p, u[:2])))
215
self.port = portal.Portal(r)
216
self.port.registerChecker(self.db)
218
def testGoodCredentials(self):
219
goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
220
d = defer.gatherResults([self.db.requestAvatarId(c) for c in goodCreds])
221
d.addCallback(self.assertEquals, [u for u, p in self.users])
224
def testGoodCredentials_login(self):
225
goodCreds = [credentials.UsernamePassword(u, p) for u, p in self.users]
226
d = defer.gatherResults([self.port.login(c, None, ITestable)
228
d.addCallback(lambda x: [a.original.name for i, a, l in x])
229
d.addCallback(self.assertEquals, [u for u, p in self.users])
232
def testBadCredentials(self):
233
badCreds = [credentials.UsernamePassword(u, 'wrong password')
234
for u, p in self.users]
235
d = defer.DeferredList([self.port.login(c, None, ITestable)
236
for c in badCreds], consumeErrors=True)
237
d.addCallback(self._assertFailures, error.UnauthorizedLogin)
240
def testHashedCredentials(self):
241
hashedCreds = [credentials.UsernameHashedPassword(u, crypt(p, u[:2]))
242
for u, p in self.users]
243
d = defer.DeferredList([self.port.login(c, None, ITestable)
244
for c in hashedCreds], consumeErrors=True)
245
d.addCallback(self._assertFailures, error.UnhandledCredentials)
248
def _assertFailures(self, failures, *expectedFailures):
249
for flag, failure in failures:
250
self.failUnlessEqual(flag, defer.FAILURE)
251
failure.trap(*expectedFailures)
255
skip = "crypt module not available"
257
class PluggableAuthenticationModulesTest(unittest.TestCase):
259
def setUpClass(self):
260
self._oldCallIntoPAM = pamauth.callIntoPAM
261
pamauth.callIntoPAM = self.callIntoPAM
263
def tearDownClass(self):
264
pamauth.callIntoPAM = self._oldCallIntoPAM
266
def callIntoPAM(self, service, user, conv):
267
if service != 'Twisted':
268
raise error.UnauthorizedLogin('bad service: %s' % service)
269
if user != 'testuser':
270
raise error.UnauthorizedLogin('bad username: %s' % user)
273
(2, "Message w/ Input"),
274
(3, "Message w/o Input"),
276
replies = conv(questions)
282
raise error.UnauthorizedLogin('bad conversion: %s' % repr(replies))
285
def _makeConv(self, d):
287
return defer.succeed([(d[t], 0) for t, q in questions])
290
def testRequestAvatarId(self):
291
db = checkers.PluggableAuthenticationModulesChecker()
292
conv = self._makeConv({1:'password', 2:'entry', 3:''})
293
creds = credentials.PluggableAuthenticationModules('testuser',
295
d = db.requestAvatarId(creds)
296
d.addCallback(self.assertEquals, 'testuser')
299
def testBadCredentials(self):
300
db = checkers.PluggableAuthenticationModulesChecker()
301
conv = self._makeConv({1:'', 2:'', 3:''})
302
creds = credentials.PluggableAuthenticationModules('testuser',
304
d = db.requestAvatarId(creds)
305
self.assertFailure(d, error.UnauthorizedLogin)
308
def testBadUsername(self):
309
db = checkers.PluggableAuthenticationModulesChecker()
310
conv = self._makeConv({1:'password', 2:'entry', 3:''})
311
creds = credentials.PluggableAuthenticationModules('baduser',
313
d = db.requestAvatarId(creds)
314
self.assertFailure(d, error.UnauthorizedLogin)
318
skip = "Can't run without PyPAM"
321
def testPositive(self):
322
for chk in self.getCheckers():
323
for (cred, avatarId) in self.getGoodCredentials():
324
r = wFD(chk.requestAvatarId(cred))
326
self.assertEquals(r.getResult(), avatarId)
327
testPositive = dG(testPositive)
329
def testNegative(self):
330
for chk in self.getCheckers():
331
for cred in self.getBadCredentials():
332
r = wFD(chk.requestAvatarId(cred))
334
self.assertRaises(error.UnauthorizedLogin, r.getResult)
335
testNegative = dG(testNegative)
337
class HashlessFilePasswordDBMixin:
338
credClass = credentials.UsernamePassword
340
networkHash = staticmethod(lambda x: x)
342
_validCredentials = [
343
('user1', 'password1'),
344
('user2', 'password2'),
345
('user3', 'password3')]
347
def getGoodCredentials(self):
348
for u, p in self._validCredentials:
349
yield self.credClass(u, self.networkHash(p)), u
351
def getBadCredentials(self):
352
for u, p in [('user1', 'password3'),
353
('user2', 'password1'),
355
yield self.credClass(u, self.networkHash(p))
357
def getCheckers(self):
358
diskHash = self.diskHash or (lambda x: x)
359
hashCheck = self.diskHash and (lambda username, password, stored: self.diskHash(password))
361
for cache in True, False:
364
for u, p in self._validCredentials:
365
fObj.write('%s:%s\n' % (u, diskHash(p)))
367
yield checkers.FilePasswordDB(fn, cache=cache, hash=hashCheck)
371
for u, p in self._validCredentials:
372
fObj.write('%s dingle dongle %s\n' % (diskHash(p), u))
374
yield checkers.FilePasswordDB(fn, ' ', 3, 0, cache=cache, hash=hashCheck)
378
for u, p in self._validCredentials:
379
fObj.write('zip,zap,%s,zup,%s\n' % (u.title(), diskHash(p)))
381
yield checkers.FilePasswordDB(fn, ',', 2, 4, False, cache=cache, hash=hashCheck)
383
class LocallyHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
384
diskHash = staticmethod(lambda x: x.encode('hex'))
386
class NetworkHashedFilePasswordDBMixin(HashlessFilePasswordDBMixin):
387
networkHash = staticmethod(lambda x: x.encode('hex'))
388
class credClass(credentials.UsernameHashedPassword):
389
def checkPassword(self, password):
390
return self.hashed.decode('hex') == password
392
class HashlessFilePasswordDBCheckerTestCase(HashlessFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
395
class LocallyHashedFilePasswordDBCheckerTestCase(LocallyHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase):
398
class NetworkHashedFilePasswordDBCheckerTestCase(NetworkHashedFilePasswordDBMixin, CheckersMixin, unittest.TestCase):