1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.conch.checkers}.
15
from twisted.trial.unittest import TestCase
16
from twisted.python.filepath import FilePath
17
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
18
from twisted.cred.credentials import UsernamePassword, IUsernamePassword, \
19
SSHPrivateKey, ISSHPrivateKey
20
from twisted.cred.error import UnhandledCredentials, UnauthorizedLogin
21
from twisted.python.fakepwd import UserDatabase
22
from twisted.test.test_process import MockOS
25
import Crypto.Cipher.DES3
28
SSHPublicKeyDatabase = None
30
from twisted.conch.ssh import keys
31
from twisted.conch.checkers import SSHPublicKeyDatabase, SSHProtocolChecker
32
from twisted.conch.error import NotEnoughAuthentication, ValidPublicKey
33
from twisted.conch.test import keydata
36
class SSHPublicKeyDatabaseTestCase(TestCase):
38
Tests for L{SSHPublicKeyDatabase}.
42
skip = "Cannot run without pwd module"
43
elif SSHPublicKeyDatabase is None:
44
skip = "Cannot run without PyCrypto or PyASN1"
47
self.checker = SSHPublicKeyDatabase()
48
self.key1 = base64.encodestring("foobar")
49
self.key2 = base64.encodestring("eggspam")
50
self.content = "t1 %s foo\nt2 %s egg\n" % (self.key1, self.key2)
52
self.mockos = MockOS()
53
self.mockos.path = FilePath(self.mktemp())
54
self.mockos.path.makedirs()
55
self.sshDir = self.mockos.path.child('.ssh')
56
self.sshDir.makedirs()
58
userdb = UserDatabase()
59
userdb.addUser('user', 'password', 1, 2, 'first last',
60
self.mockos.path.path, '/bin/shell')
62
self.patch(pwd, "getpwnam", userdb.getpwnam)
63
self.patch(os, "seteuid", self.mockos.seteuid)
64
self.patch(os, "setegid", self.mockos.setegid)
67
def _testCheckKey(self, filename):
68
self.sshDir.child(filename).setContent(self.content)
69
user = UsernamePassword("user", "password")
71
self.assertTrue(self.checker.checkKey(user))
73
self.assertTrue(self.checker.checkKey(user))
74
user.blob = "notallowed"
75
self.assertFalse(self.checker.checkKey(user))
78
def test_checkKey(self):
80
L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
81
authorized_keys file and check the keys against that file.
83
self._testCheckKey("authorized_keys")
84
self.assertEquals(self.mockos.seteuidCalls, [])
85
self.assertEquals(self.mockos.setegidCalls, [])
88
def test_checkKey2(self):
90
L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
91
authorized_keys2 file and check the keys against that file.
93
self._testCheckKey("authorized_keys2")
94
self.assertEquals(self.mockos.seteuidCalls, [])
95
self.assertEquals(self.mockos.setegidCalls, [])
98
def test_checkKeyAsRoot(self):
100
If the key file is readable, L{SSHPublicKeyDatabase.checkKey} should
101
switch its uid/gid to the ones of the authenticated user.
103
keyFile = self.sshDir.child("authorized_keys")
104
keyFile.setContent(self.content)
105
# Fake permission error by changing the mode
107
self.addCleanup(keyFile.chmod, 0777)
108
# And restore the right mode when seteuid is called
109
savedSeteuid = os.seteuid
112
return savedSeteuid(euid)
113
self.patch(os, "seteuid", seteuid)
114
user = UsernamePassword("user", "password")
116
self.assertTrue(self.checker.checkKey(user))
117
self.assertEquals(self.mockos.seteuidCalls, [0, 1, 0, os.getuid()])
118
self.assertEquals(self.mockos.setegidCalls, [2, os.getgid()])
121
def test_requestAvatarId(self):
123
L{SSHPublicKeyDatabase.requestAvatarId} should return the avatar id
124
passed in if its C{_checkKey} method returns True.
126
def _checkKey(ignored):
128
self.patch(self.checker, 'checkKey', _checkKey)
129
credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh,
130
'foo', keys.Key.fromString(keydata.privateRSA_openssh).sign('foo'))
131
d = self.checker.requestAvatarId(credentials)
132
def _verify(avatarId):
133
self.assertEquals(avatarId, 'test')
134
return d.addCallback(_verify)
137
def test_requestAvatarIdWithoutSignature(self):
139
L{SSHPublicKeyDatabase.requestAvatarId} should raise L{ValidPublicKey}
140
if the credentials represent a valid key without a signature. This
141
tells the user that the key is valid for login, but does not actually
142
allow that user to do so without a signature.
144
def _checkKey(ignored):
146
self.patch(self.checker, 'checkKey', _checkKey)
147
credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, None, None)
148
d = self.checker.requestAvatarId(credentials)
149
return self.assertFailure(d, ValidPublicKey)
152
def test_requestAvatarIdInvalidKey(self):
154
If L{SSHPublicKeyDatabase.checkKey} returns False,
155
C{_cbRequestAvatarId} should raise L{UnauthorizedLogin}.
157
def _checkKey(ignored):
159
self.patch(self.checker, 'checkKey', _checkKey)
160
d = self.checker.requestAvatarId(None);
161
return self.assertFailure(d, UnauthorizedLogin)
164
def test_requestAvatarIdInvalidSignature(self):
166
Valid keys with invalid signatures should cause
167
L{SSHPublicKeyDatabase.requestAvatarId} to return a {UnauthorizedLogin}
170
def _checkKey(ignored):
172
self.patch(self.checker, 'checkKey', _checkKey)
173
credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh,
174
'foo', keys.Key.fromString(keydata.privateDSA_openssh).sign('foo'))
175
d = self.checker.requestAvatarId(credentials)
176
return self.assertFailure(d, UnauthorizedLogin)
179
def test_requestAvatarIdNormalizeException(self):
181
Exceptions raised while verifying the key should be normalized into an
182
C{UnauthorizedLogin} failure.
184
def _checkKey(ignored):
186
self.patch(self.checker, 'checkKey', _checkKey)
187
credentials = SSHPrivateKey('test', None, 'blob', 'sigData', 'sig')
188
d = self.checker.requestAvatarId(credentials)
189
def _verifyLoggedException(failure):
190
errors = self.flushLoggedErrors(keys.BadKeyError)
191
self.assertEqual(len(errors), 1)
193
d.addErrback(_verifyLoggedException)
194
return self.assertFailure(d, UnauthorizedLogin)
197
class SSHProtocolCheckerTestCase(TestCase):
199
Tests for L{SSHProtocolChecker}.
202
if SSHPublicKeyDatabase is None:
203
skip = "Cannot run without PyCrypto"
205
def test_registerChecker(self):
207
L{SSHProcotolChecker.registerChecker} should add the given checker to
208
the list of registered checkers.
210
checker = SSHProtocolChecker()
211
self.assertEquals(checker.credentialInterfaces, [])
212
checker.registerChecker(SSHPublicKeyDatabase(), )
213
self.assertEquals(checker.credentialInterfaces, [ISSHPrivateKey])
214
self.assertIsInstance(checker.checkers[ISSHPrivateKey],
215
SSHPublicKeyDatabase)
218
def test_registerCheckerWithInterface(self):
220
If a apecific interface is passed into
221
L{SSHProtocolChecker.registerChecker}, that interface should be
222
registered instead of what the checker specifies in
225
checker = SSHProtocolChecker()
226
self.assertEquals(checker.credentialInterfaces, [])
227
checker.registerChecker(SSHPublicKeyDatabase(), IUsernamePassword)
228
self.assertEquals(checker.credentialInterfaces, [IUsernamePassword])
229
self.assertIsInstance(checker.checkers[IUsernamePassword],
230
SSHPublicKeyDatabase)
233
def test_requestAvatarId(self):
235
L{SSHProtocolChecker.requestAvatarId} should defer to one if its
236
registered checkers to authenticate a user.
238
checker = SSHProtocolChecker()
239
passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
240
passwordDatabase.addUser('test', 'test')
241
checker.registerChecker(passwordDatabase)
242
d = checker.requestAvatarId(UsernamePassword('test', 'test'))
243
def _callback(avatarId):
244
self.assertEquals(avatarId, 'test')
245
return d.addCallback(_callback)
248
def test_requestAvatarIdWithNotEnoughAuthentication(self):
250
If the client indicates that it is never satisfied, by always returning
251
False from _areDone, then L{SSHProtocolChecker} should raise
252
L{NotEnoughAuthentication}.
254
checker = SSHProtocolChecker()
255
def _areDone(avatarId):
257
self.patch(checker, 'areDone', _areDone)
259
passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
260
passwordDatabase.addUser('test', 'test')
261
checker.registerChecker(passwordDatabase)
262
d = checker.requestAvatarId(UsernamePassword('test', 'test'))
263
return self.assertFailure(d, NotEnoughAuthentication)
266
def test_requestAvatarIdInvalidCredential(self):
268
If the passed credentials aren't handled by any registered checker,
269
L{SSHProtocolChecker} should raise L{UnhandledCredentials}.
271
checker = SSHProtocolChecker()
272
d = checker.requestAvatarId(UsernamePassword('test', 'test'))
273
return self.assertFailure(d, UnhandledCredentials)
276
def test_areDone(self):
278
The default L{SSHProcotolChecker.areDone} should simply return True.
280
self.assertEquals(SSHProtocolChecker().areDone(None), True)