~0x44/nova/extdoc

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/conch/test/test_checkers.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.conch.checkers}.
 
6
"""
 
7
 
 
8
try:
 
9
    import pwd
 
10
except ImportError:
 
11
    pwd = None
 
12
 
 
13
import os, base64
 
14
 
 
15
from twisted.trial.unittest import TestCase
 
16
from twisted.python.filepath import FilePath
 
17
from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
 
18
from twisted.cred.credentials import UsernamePassword, IUsernamePassword, \
 
19
    SSHPrivateKey, ISSHPrivateKey
 
20
from twisted.cred.error import UnhandledCredentials, UnauthorizedLogin
 
21
from twisted.python.fakepwd import UserDatabase
 
22
from twisted.test.test_process import MockOS
 
23
 
 
24
try:
 
25
    import Crypto.Cipher.DES3
 
26
    import pyasn1
 
27
except ImportError:
 
28
    SSHPublicKeyDatabase = None
 
29
else:
 
30
    from twisted.conch.ssh import keys
 
31
    from twisted.conch.checkers import SSHPublicKeyDatabase, SSHProtocolChecker
 
32
    from twisted.conch.error import NotEnoughAuthentication, ValidPublicKey
 
33
    from twisted.conch.test import keydata
 
34
 
 
35
 
 
36
class SSHPublicKeyDatabaseTestCase(TestCase):
 
37
    """
 
38
    Tests for L{SSHPublicKeyDatabase}.
 
39
    """
 
40
 
 
41
    if pwd is None:
 
42
        skip = "Cannot run without pwd module"
 
43
    elif SSHPublicKeyDatabase is None:
 
44
        skip = "Cannot run without PyCrypto or PyASN1"
 
45
 
 
46
    def setUp(self):
 
47
        self.checker = SSHPublicKeyDatabase()
 
48
        self.key1 = base64.encodestring("foobar")
 
49
        self.key2 = base64.encodestring("eggspam")
 
50
        self.content = "t1 %s foo\nt2 %s egg\n" % (self.key1, self.key2)
 
51
 
 
52
        self.mockos = MockOS()
 
53
        self.mockos.path = FilePath(self.mktemp())
 
54
        self.mockos.path.makedirs()
 
55
        self.sshDir = self.mockos.path.child('.ssh')
 
56
        self.sshDir.makedirs()
 
57
 
 
58
        userdb = UserDatabase()
 
59
        userdb.addUser('user', 'password', 1, 2, 'first last',
 
60
                self.mockos.path.path, '/bin/shell')
 
61
 
 
62
        self.patch(pwd, "getpwnam", userdb.getpwnam)
 
63
        self.patch(os, "seteuid", self.mockos.seteuid)
 
64
        self.patch(os, "setegid", self.mockos.setegid)
 
65
 
 
66
 
 
67
    def _testCheckKey(self, filename):
 
68
        self.sshDir.child(filename).setContent(self.content)
 
69
        user = UsernamePassword("user", "password")
 
70
        user.blob = "foobar"
 
71
        self.assertTrue(self.checker.checkKey(user))
 
72
        user.blob = "eggspam"
 
73
        self.assertTrue(self.checker.checkKey(user))
 
74
        user.blob = "notallowed"
 
75
        self.assertFalse(self.checker.checkKey(user))
 
76
 
 
77
 
 
78
    def test_checkKey(self):
 
79
        """
 
80
        L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
 
81
        authorized_keys file and check the keys against that file.
 
82
        """
 
83
        self._testCheckKey("authorized_keys")
 
84
        self.assertEquals(self.mockos.seteuidCalls, [])
 
85
        self.assertEquals(self.mockos.setegidCalls, [])
 
86
 
 
87
 
 
88
    def test_checkKey2(self):
 
89
        """
 
90
        L{SSHPublicKeyDatabase.checkKey} should retrieve the content of the
 
91
        authorized_keys2 file and check the keys against that file.
 
92
        """
 
93
        self._testCheckKey("authorized_keys2")
 
94
        self.assertEquals(self.mockos.seteuidCalls, [])
 
95
        self.assertEquals(self.mockos.setegidCalls, [])
 
96
 
 
97
 
 
98
    def test_checkKeyAsRoot(self):
 
99
        """
 
100
        If the key file is readable, L{SSHPublicKeyDatabase.checkKey} should
 
101
        switch its uid/gid to the ones of the authenticated user.
 
102
        """
 
103
        keyFile = self.sshDir.child("authorized_keys")
 
104
        keyFile.setContent(self.content)
 
105
        # Fake permission error by changing the mode
 
106
        keyFile.chmod(0000)
 
107
        self.addCleanup(keyFile.chmod, 0777)
 
108
        # And restore the right mode when seteuid is called
 
109
        savedSeteuid = os.seteuid
 
110
        def seteuid(euid):
 
111
            keyFile.chmod(0777)
 
112
            return savedSeteuid(euid)
 
113
        self.patch(os, "seteuid", seteuid)
 
114
        user = UsernamePassword("user", "password")
 
115
        user.blob = "foobar"
 
116
        self.assertTrue(self.checker.checkKey(user))
 
117
        self.assertEquals(self.mockos.seteuidCalls, [0, 1, 0, os.getuid()])
 
118
        self.assertEquals(self.mockos.setegidCalls, [2, os.getgid()])
 
119
 
 
120
 
 
121
    def test_requestAvatarId(self):
 
122
        """
 
123
        L{SSHPublicKeyDatabase.requestAvatarId} should return the avatar id
 
124
        passed in if its C{_checkKey} method returns True.
 
125
        """
 
126
        def _checkKey(ignored):
 
127
            return True
 
128
        self.patch(self.checker, 'checkKey', _checkKey)
 
129
        credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh,
 
130
                                    'foo', keys.Key.fromString(keydata.privateRSA_openssh).sign('foo'))
 
131
        d = self.checker.requestAvatarId(credentials)
 
132
        def _verify(avatarId):
 
133
            self.assertEquals(avatarId, 'test')
 
134
        return d.addCallback(_verify)
 
135
 
 
136
 
 
137
    def test_requestAvatarIdWithoutSignature(self):
 
138
        """
 
139
        L{SSHPublicKeyDatabase.requestAvatarId} should raise L{ValidPublicKey}
 
140
        if the credentials represent a valid key without a signature.  This
 
141
        tells the user that the key is valid for login, but does not actually
 
142
        allow that user to do so without a signature.
 
143
        """
 
144
        def _checkKey(ignored):
 
145
            return True
 
146
        self.patch(self.checker, 'checkKey', _checkKey)
 
147
        credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh, None, None)
 
148
        d = self.checker.requestAvatarId(credentials)
 
149
        return self.assertFailure(d, ValidPublicKey)
 
150
 
 
151
 
 
152
    def test_requestAvatarIdInvalidKey(self):
 
153
        """
 
154
        If L{SSHPublicKeyDatabase.checkKey} returns False,
 
155
        C{_cbRequestAvatarId} should raise L{UnauthorizedLogin}.
 
156
        """
 
157
        def _checkKey(ignored):
 
158
            return False
 
159
        self.patch(self.checker, 'checkKey', _checkKey)
 
160
        d = self.checker.requestAvatarId(None);
 
161
        return self.assertFailure(d, UnauthorizedLogin)
 
162
 
 
163
 
 
164
    def test_requestAvatarIdInvalidSignature(self):
 
165
        """
 
166
        Valid keys with invalid signatures should cause
 
167
        L{SSHPublicKeyDatabase.requestAvatarId} to return a {UnauthorizedLogin}
 
168
        failure
 
169
        """
 
170
        def _checkKey(ignored):
 
171
            return True
 
172
        self.patch(self.checker, 'checkKey', _checkKey)
 
173
        credentials = SSHPrivateKey('test', 'ssh-rsa', keydata.publicRSA_openssh,
 
174
                                    'foo', keys.Key.fromString(keydata.privateDSA_openssh).sign('foo'))
 
175
        d = self.checker.requestAvatarId(credentials)
 
176
        return self.assertFailure(d, UnauthorizedLogin)
 
177
 
 
178
 
 
179
    def test_requestAvatarIdNormalizeException(self):
 
180
        """
 
181
        Exceptions raised while verifying the key should be normalized into an
 
182
        C{UnauthorizedLogin} failure.
 
183
        """
 
184
        def _checkKey(ignored):
 
185
            return True
 
186
        self.patch(self.checker, 'checkKey', _checkKey)
 
187
        credentials = SSHPrivateKey('test', None, 'blob', 'sigData', 'sig')
 
188
        d = self.checker.requestAvatarId(credentials)
 
189
        def _verifyLoggedException(failure):
 
190
            errors = self.flushLoggedErrors(keys.BadKeyError)
 
191
            self.assertEqual(len(errors), 1)
 
192
            return failure
 
193
        d.addErrback(_verifyLoggedException)
 
194
        return self.assertFailure(d, UnauthorizedLogin)
 
195
 
 
196
 
 
197
class SSHProtocolCheckerTestCase(TestCase):
 
198
    """
 
199
    Tests for L{SSHProtocolChecker}.
 
200
    """
 
201
 
 
202
    if SSHPublicKeyDatabase is None:
 
203
        skip = "Cannot run without PyCrypto"
 
204
 
 
205
    def test_registerChecker(self):
 
206
        """
 
207
        L{SSHProcotolChecker.registerChecker} should add the given checker to
 
208
        the list of registered checkers.
 
209
        """
 
210
        checker = SSHProtocolChecker()
 
211
        self.assertEquals(checker.credentialInterfaces, [])
 
212
        checker.registerChecker(SSHPublicKeyDatabase(), )
 
213
        self.assertEquals(checker.credentialInterfaces, [ISSHPrivateKey])
 
214
        self.assertIsInstance(checker.checkers[ISSHPrivateKey],
 
215
                              SSHPublicKeyDatabase)
 
216
 
 
217
 
 
218
    def test_registerCheckerWithInterface(self):
 
219
        """
 
220
        If a apecific interface is passed into
 
221
        L{SSHProtocolChecker.registerChecker}, that interface should be
 
222
        registered instead of what the checker specifies in
 
223
        credentialIntefaces.
 
224
        """
 
225
        checker = SSHProtocolChecker()
 
226
        self.assertEquals(checker.credentialInterfaces, [])
 
227
        checker.registerChecker(SSHPublicKeyDatabase(), IUsernamePassword)
 
228
        self.assertEquals(checker.credentialInterfaces, [IUsernamePassword])
 
229
        self.assertIsInstance(checker.checkers[IUsernamePassword],
 
230
                              SSHPublicKeyDatabase)
 
231
 
 
232
 
 
233
    def test_requestAvatarId(self):
 
234
        """
 
235
        L{SSHProtocolChecker.requestAvatarId} should defer to one if its
 
236
        registered checkers to authenticate a user.
 
237
        """
 
238
        checker = SSHProtocolChecker()
 
239
        passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
 
240
        passwordDatabase.addUser('test', 'test')
 
241
        checker.registerChecker(passwordDatabase)
 
242
        d = checker.requestAvatarId(UsernamePassword('test', 'test'))
 
243
        def _callback(avatarId):
 
244
            self.assertEquals(avatarId, 'test')
 
245
        return d.addCallback(_callback)
 
246
 
 
247
 
 
248
    def test_requestAvatarIdWithNotEnoughAuthentication(self):
 
249
        """
 
250
        If the client indicates that it is never satisfied, by always returning
 
251
        False from _areDone, then L{SSHProtocolChecker} should raise
 
252
        L{NotEnoughAuthentication}.
 
253
        """
 
254
        checker = SSHProtocolChecker()
 
255
        def _areDone(avatarId):
 
256
            return False
 
257
        self.patch(checker, 'areDone', _areDone)
 
258
 
 
259
        passwordDatabase = InMemoryUsernamePasswordDatabaseDontUse()
 
260
        passwordDatabase.addUser('test', 'test')
 
261
        checker.registerChecker(passwordDatabase)
 
262
        d = checker.requestAvatarId(UsernamePassword('test', 'test'))
 
263
        return self.assertFailure(d, NotEnoughAuthentication)
 
264
 
 
265
 
 
266
    def test_requestAvatarIdInvalidCredential(self):
 
267
        """
 
268
        If the passed credentials aren't handled by any registered checker,
 
269
        L{SSHProtocolChecker} should raise L{UnhandledCredentials}.
 
270
        """
 
271
        checker = SSHProtocolChecker()
 
272
        d = checker.requestAvatarId(UsernamePassword('test', 'test'))
 
273
        return self.assertFailure(d, UnhandledCredentials)
 
274
 
 
275
 
 
276
    def test_areDone(self):
 
277
        """
 
278
        The default L{SSHProcotolChecker.areDone} should simply return True.
 
279
        """
 
280
        self.assertEquals(SSHProtocolChecker().areDone(None), True)