1
# -*- test-case-name: twisted.conch.test.test_userauth -*-
2
# Copyright (c) 2007-2010 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Tests for the implementation of the ssh-userauth service.
8
Maintainer: Paul Swartz
11
from zope.interface import implements
13
from twisted.cred.checkers import ICredentialsChecker
14
from twisted.cred.credentials import IUsernamePassword, ISSHPrivateKey
15
from twisted.cred.credentials import IPluggableAuthenticationModules
16
from twisted.cred.credentials import IAnonymous
17
from twisted.cred.error import UnauthorizedLogin
18
from twisted.cred.portal import IRealm, Portal
19
from twisted.conch.error import ConchError, ValidPublicKey
20
from twisted.internet import defer, task
21
from twisted.protocols import loopback
22
from twisted.trial import unittest
25
import Crypto.Cipher.DES3, Crypto.Cipher.XOR
32
class SSHTransportBase:
34
A stub class so that later class definitions won't die.
38
class SSHUserAuthClient:
40
A stub class so that leter class definitions won't die.
43
from twisted.conch.ssh.common import NS
44
from twisted.conch.checkers import SSHProtocolChecker
45
from twisted.conch.ssh import keys, userauth, transport
46
from twisted.conch.test import keydata
50
class ClientUserAuth(userauth.SSHUserAuthClient):
52
A mock user auth client.
56
def getPublicKey(self):
58
If this is the first time we've been called, return a blob for
59
the DSA key. Otherwise, return a blob
62
if self.lastPublicKey:
63
return keys.Key.fromString(keydata.publicRSA_openssh)
65
return defer.succeed(keys.Key.fromString(keydata.publicDSA_openssh))
68
def getPrivateKey(self):
70
Return the private key object for the RSA key.
72
return defer.succeed(keys.Key.fromString(keydata.privateRSA_openssh))
75
def getPassword(self, prompt=None):
77
Return 'foo' as the password.
79
return defer.succeed('foo')
82
def getGenericAnswers(self, name, information, answers):
84
Return 'foo' as the answer to two questions.
86
return defer.succeed(('foo', 'foo'))
90
class OldClientAuth(userauth.SSHUserAuthClient):
92
The old SSHUserAuthClient returned a PyCrypto key object from
93
getPrivateKey() and a string from getPublicKey
97
def getPrivateKey(self):
98
return defer.succeed(keys.Key.fromString(
99
keydata.privateRSA_openssh).keyObject)
102
def getPublicKey(self):
103
return keys.Key.fromString(keydata.publicRSA_openssh).blob()
105
class ClientAuthWithoutPrivateKey(userauth.SSHUserAuthClient):
107
This client doesn't have a private key, but it does have a public key.
111
def getPrivateKey(self):
115
def getPublicKey(self):
116
return keys.Key.fromString(keydata.publicRSA_openssh)
120
class FakeTransport(transport.SSHTransportBase):
122
L{userauth.SSHUserAuthServer} expects an SSH transport which has a factory
123
attribute which has a portal attribute. Because the portal is important for
124
testing authentication, we need to be able to provide an interesting portal
125
object to the L{SSHUserAuthServer}.
127
In addition, we want to be able to capture any packets sent over the
130
@ivar packets: a list of 2-tuples: (messageType, data). Each 2-tuple is
132
@type packets: C{list}
133
@param lostConnecion: True if loseConnection has been called on us.
134
@type lostConnection: C{bool}
138
class Service(object):
140
A mock service, representing the other service offered by the server.
145
def serviceStarted(self):
150
class Factory(object):
152
A mock factory, representing the factory that spawned this user auth
157
def getService(self, transport, service):
159
Return our fake service.
161
if service == 'none':
162
return FakeTransport.Service
166
def __init__(self, portal):
167
self.factory = self.Factory()
168
self.factory.portal = portal
169
self.lostConnection = False
170
self.transport = self
175
def sendPacket(self, messageType, message):
177
Record the packet sent by the service.
179
self.packets.append((messageType, message))
182
def isEncrypted(self, direction):
184
Pretend that this transport encrypts traffic in both directions. The
185
SSHUserAuthServer disables password authentication if the transport
191
def loseConnection(self):
192
self.lostConnection = True
198
A mock realm for testing L{userauth.SSHUserAuthServer}.
200
This realm is not actually used in the course of testing, so it returns the
201
simplest thing that could possibly work.
206
def requestAvatar(self, avatarId, mind, *interfaces):
207
return defer.succeed((interfaces[0], None, lambda: None))
211
class PasswordChecker(object):
213
A very simple username/password checker which authenticates anyone whose
214
password matches their username and rejects all others.
216
credentialInterfaces = (IUsernamePassword,)
217
implements(ICredentialsChecker)
220
def requestAvatarId(self, creds):
221
if creds.username == creds.password:
222
return defer.succeed(creds.username)
223
return defer.fail(UnauthorizedLogin("Invalid username/password pair"))
227
class PrivateKeyChecker(object):
229
A very simple public key checker which authenticates anyone whose
230
public/private keypair is the same keydata.public/privateRSA_openssh.
232
credentialInterfaces = (ISSHPrivateKey,)
233
implements(ICredentialsChecker)
237
def requestAvatarId(self, creds):
238
if creds.blob == keys.Key.fromString(keydata.publicRSA_openssh).blob():
239
if creds.signature is not None:
240
obj = keys.Key.fromString(creds.blob)
241
if obj.verify(creds.signature, creds.sigData):
242
return creds.username
244
raise ValidPublicKey()
245
raise UnauthorizedLogin()
249
class PAMChecker(object):
251
A simple PAM checker which asks the user for a password, verifying them
252
if the password is the same as their username.
254
credentialInterfaces = (IPluggableAuthenticationModules,)
255
implements(ICredentialsChecker)
258
def requestAvatarId(self, creds):
259
d = creds.pamConversion([('Name: ', 2), ("Password: ", 1)])
261
if values == [(creds.username, 0), (creds.username, 0)]:
262
return creds.username
263
raise UnauthorizedLogin()
264
return d.addCallback(check)
268
class AnonymousChecker(object):
270
A simple checker which isn't supported by L{SSHUserAuthServer}.
272
credentialInterfaces = (IAnonymous,)
273
implements(ICredentialsChecker)
277
class SSHUserAuthServerTestCase(unittest.TestCase):
279
Tests for SSHUserAuthServer.
284
skip = "cannot run w/o PyCrypto"
289
self.portal = Portal(self.realm)
290
self.portal.registerChecker(PasswordChecker())
291
self.portal.registerChecker(PrivateKeyChecker())
292
self.portal.registerChecker(PAMChecker())
293
self.authServer = userauth.SSHUserAuthServer()
294
self.authServer.transport = FakeTransport(self.portal)
295
self.authServer.serviceStarted()
296
self.authServer.supportedAuthentications.sort() # give a consistent
301
self.authServer.serviceStopped()
302
self.authServer = None
305
def _checkFailed(self, ignored):
307
Check that the authentication has failed.
309
self.assertEquals(self.authServer.transport.packets[-1],
310
(userauth.MSG_USERAUTH_FAILURE,
311
NS('keyboard-interactive,password,publickey') + '\x00'))
314
def test_noneAuthentication(self):
316
A client may request a list of authentication 'method name' values
317
that may continue by using the "none" authentication 'method name'.
319
See RFC 4252 Section 5.2.
321
d = self.authServer.ssh_USERAUTH_REQUEST(NS('foo') + NS('service') +
323
return d.addCallback(self._checkFailed)
326
def test_successfulPasswordAuthentication(self):
328
When provided with correct password authentication information, the
329
server should respond by sending a MSG_USERAUTH_SUCCESS message with
332
See RFC 4252, Section 5.1.
334
packet = NS('foo') + NS('none') + NS('password') + chr(0) + NS('foo')
335
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
338
self.authServer.transport.packets,
339
[(userauth.MSG_USERAUTH_SUCCESS, '')])
340
return d.addCallback(check)
343
def test_failedPasswordAuthentication(self):
345
When provided with invalid authentication details, the server should
346
respond by sending a MSG_USERAUTH_FAILURE message which states whether
347
the authentication was partially successful, and provides other, open
348
options for authentication.
350
See RFC 4252, Section 5.1.
352
# packet = username, next_service, authentication type, FALSE, password
353
packet = NS('foo') + NS('none') + NS('password') + chr(0) + NS('bar')
354
self.authServer.clock = task.Clock()
355
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
356
self.assertEquals(self.authServer.transport.packets, [])
357
self.authServer.clock.advance(2)
358
return d.addCallback(self._checkFailed)
361
def test_successfulPrivateKeyAuthentication(self):
363
Test that private key authentication completes sucessfully,
365
blob = keys.Key.fromString(keydata.publicRSA_openssh).blob()
366
obj = keys.Key.fromString(keydata.privateRSA_openssh)
367
packet = (NS('foo') + NS('none') + NS('publickey') + '\xff'
368
+ NS(obj.sshType()) + NS(blob))
369
self.authServer.transport.sessionID = 'test'
370
signature = obj.sign(NS('test') + chr(userauth.MSG_USERAUTH_REQUEST)
372
packet += NS(signature)
373
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
375
self.assertEquals(self.authServer.transport.packets,
376
[(userauth.MSG_USERAUTH_SUCCESS, '')])
377
return d.addCallback(check)
380
def test_requestRaisesConchError(self):
382
ssh_USERAUTH_REQUEST should raise a ConchError if tryAuth returns
383
None. Added to catch a bug noticed by pyflakes.
387
def mockCbFinishedAuth(self, ignored):
388
self.fail('request should have raised ConochError')
390
def mockTryAuth(kind, user, data):
393
def mockEbBadAuth(reason):
394
d.errback(reason.value)
396
self.patch(self.authServer, 'tryAuth', mockTryAuth)
397
self.patch(self.authServer, '_cbFinishedAuth', mockCbFinishedAuth)
398
self.patch(self.authServer, '_ebBadAuth', mockEbBadAuth)
400
packet = NS('user') + NS('none') + NS('public-key') + NS('data')
401
# If an error other than ConchError is raised, this will trigger an
403
self.authServer.ssh_USERAUTH_REQUEST(packet)
404
return self.assertFailure(d, ConchError)
407
def test_verifyValidPrivateKey(self):
409
Test that verifying a valid private key works.
411
blob = keys.Key.fromString(keydata.publicRSA_openssh).blob()
412
packet = (NS('foo') + NS('none') + NS('publickey') + '\x00'
413
+ NS('ssh-rsa') + NS(blob))
414
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
416
self.assertEquals(self.authServer.transport.packets,
417
[(userauth.MSG_USERAUTH_PK_OK, NS('ssh-rsa') + NS(blob))])
418
return d.addCallback(check)
421
def test_failedPrivateKeyAuthenticationWithoutSignature(self):
423
Test that private key authentication fails when the public key
426
blob = keys.Key.fromString(keydata.publicDSA_openssh).blob()
427
packet = (NS('foo') + NS('none') + NS('publickey') + '\x00'
428
+ NS('ssh-dsa') + NS(blob))
429
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
430
return d.addCallback(self._checkFailed)
433
def test_failedPrivateKeyAuthenticationWithSignature(self):
435
Test that private key authentication fails when the public key
438
blob = keys.Key.fromString(keydata.publicRSA_openssh).blob()
439
obj = keys.Key.fromString(keydata.privateRSA_openssh)
440
packet = (NS('foo') + NS('none') + NS('publickey') + '\xff'
441
+ NS('ssh-rsa') + NS(blob) + NS(obj.sign(blob)))
442
self.authServer.transport.sessionID = 'test'
443
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
444
return d.addCallback(self._checkFailed)
447
def test_successfulPAMAuthentication(self):
449
Test that keyboard-interactive authentication succeeds.
451
packet = (NS('foo') + NS('none') + NS('keyboard-interactive')
453
response = '\x00\x00\x00\x02' + NS('foo') + NS('foo')
454
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
455
self.authServer.ssh_USERAUTH_INFO_RESPONSE(response)
457
self.assertEquals(self.authServer.transport.packets,
458
[(userauth.MSG_USERAUTH_INFO_REQUEST, (NS('') + NS('')
459
+ NS('') + '\x00\x00\x00\x02' + NS('Name: ') + '\x01'
460
+ NS('Password: ') + '\x00')),
461
(userauth.MSG_USERAUTH_SUCCESS, '')])
463
return d.addCallback(check)
466
def test_failedPAMAuthentication(self):
468
Test that keyboard-interactive authentication fails.
470
packet = (NS('foo') + NS('none') + NS('keyboard-interactive')
472
response = '\x00\x00\x00\x02' + NS('bar') + NS('bar')
473
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
474
self.authServer.ssh_USERAUTH_INFO_RESPONSE(response)
476
self.assertEquals(self.authServer.transport.packets[0],
477
(userauth.MSG_USERAUTH_INFO_REQUEST, (NS('') + NS('')
478
+ NS('') + '\x00\x00\x00\x02' + NS('Name: ') + '\x01'
479
+ NS('Password: ') + '\x00')))
480
return d.addCallback(check).addCallback(self._checkFailed)
483
def test_invalid_USERAUTH_INFO_RESPONSE_not_enough_data(self):
485
If ssh_USERAUTH_INFO_RESPONSE gets an invalid packet,
486
the user authentication should fail.
488
packet = (NS('foo') + NS('none') + NS('keyboard-interactive')
490
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
491
self.authServer.ssh_USERAUTH_INFO_RESPONSE(NS('\x00\x00\x00\x00' +
493
return d.addCallback(self._checkFailed)
496
def test_invalid_USERAUTH_INFO_RESPONSE_too_much_data(self):
498
If ssh_USERAUTH_INFO_RESPONSE gets too much data, the user
499
authentication should fail.
501
packet = (NS('foo') + NS('none') + NS('keyboard-interactive')
503
response = '\x00\x00\x00\x02' + NS('foo') + NS('foo') + NS('foo')
504
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
505
self.authServer.ssh_USERAUTH_INFO_RESPONSE(response)
506
return d.addCallback(self._checkFailed)
509
def test_onlyOnePAMAuthentication(self):
511
Because it requires an intermediate message, one can't send a second
512
keyboard-interactive request while the first is still pending.
514
packet = (NS('foo') + NS('none') + NS('keyboard-interactive')
516
self.authServer.ssh_USERAUTH_REQUEST(packet)
517
self.authServer.ssh_USERAUTH_REQUEST(packet)
518
self.assertEquals(self.authServer.transport.packets[-1][0],
519
transport.MSG_DISCONNECT)
520
self.assertEquals(self.authServer.transport.packets[-1][1][3],
521
chr(transport.DISCONNECT_PROTOCOL_ERROR))
524
def test_ignoreUnknownCredInterfaces(self):
526
L{SSHUserAuthServer} sets up
527
C{SSHUserAuthServer.supportedAuthentications} by checking the portal's
528
credentials interfaces and mapping them to SSH authentication method
529
strings. If the Portal advertises an interface that
530
L{SSHUserAuthServer} can't map, it should be ignored. This is a white
533
server = userauth.SSHUserAuthServer()
534
server.transport = FakeTransport(self.portal)
535
self.portal.registerChecker(AnonymousChecker())
536
server.serviceStarted()
537
server.serviceStopped()
538
server.supportedAuthentications.sort() # give a consistent order
539
self.assertEquals(server.supportedAuthentications,
540
['keyboard-interactive', 'password', 'publickey'])
543
def test_removePasswordIfUnencrypted(self):
545
Test that the userauth service does not advertise password
546
authentication if the password would be send in cleartext.
548
self.assertIn('password', self.authServer.supportedAuthentications)
550
clearAuthServer = userauth.SSHUserAuthServer()
551
clearAuthServer.transport = FakeTransport(self.portal)
552
clearAuthServer.transport.isEncrypted = lambda x: False
553
clearAuthServer.serviceStarted()
554
clearAuthServer.serviceStopped()
555
self.failIfIn('password', clearAuthServer.supportedAuthentications)
556
# only encrypt incoming (the direction the password is sent)
557
halfAuthServer = userauth.SSHUserAuthServer()
558
halfAuthServer.transport = FakeTransport(self.portal)
559
halfAuthServer.transport.isEncrypted = lambda x: x == 'in'
560
halfAuthServer.serviceStarted()
561
halfAuthServer.serviceStopped()
562
self.assertIn('password', halfAuthServer.supportedAuthentications)
565
def test_removeKeyboardInteractiveIfUnencrypted(self):
567
Test that the userauth service does not advertise keyboard-interactive
568
authentication if the password would be send in cleartext.
570
self.assertIn('keyboard-interactive',
571
self.authServer.supportedAuthentications)
573
clearAuthServer = userauth.SSHUserAuthServer()
574
clearAuthServer.transport = FakeTransport(self.portal)
575
clearAuthServer.transport.isEncrypted = lambda x: False
576
clearAuthServer.serviceStarted()
577
clearAuthServer.serviceStopped()
578
self.failIfIn('keyboard-interactive',
579
clearAuthServer.supportedAuthentications)
580
# only encrypt incoming (the direction the password is sent)
581
halfAuthServer = userauth.SSHUserAuthServer()
582
halfAuthServer.transport = FakeTransport(self.portal)
583
halfAuthServer.transport.isEncrypted = lambda x: x == 'in'
584
halfAuthServer.serviceStarted()
585
halfAuthServer.serviceStopped()
586
self.assertIn('keyboard-interactive',
587
halfAuthServer.supportedAuthentications)
590
def test_unencryptedConnectionWithoutPasswords(self):
592
If the L{SSHUserAuthServer} is not advertising passwords, then an
593
unencrypted connection should not cause any warnings or exceptions.
594
This is a white box test.
596
# create a Portal without password authentication
597
portal = Portal(self.realm)
598
portal.registerChecker(PrivateKeyChecker())
601
clearAuthServer = userauth.SSHUserAuthServer()
602
clearAuthServer.transport = FakeTransport(portal)
603
clearAuthServer.transport.isEncrypted = lambda x: False
604
clearAuthServer.serviceStarted()
605
clearAuthServer.serviceStopped()
606
self.assertEquals(clearAuthServer.supportedAuthentications,
609
# only encrypt incoming (the direction the password is sent)
610
halfAuthServer = userauth.SSHUserAuthServer()
611
halfAuthServer.transport = FakeTransport(portal)
612
halfAuthServer.transport.isEncrypted = lambda x: x == 'in'
613
halfAuthServer.serviceStarted()
614
halfAuthServer.serviceStopped()
615
self.assertEquals(clearAuthServer.supportedAuthentications,
619
def test_loginTimeout(self):
621
Test that the login times out.
623
timeoutAuthServer = userauth.SSHUserAuthServer()
624
timeoutAuthServer.clock = task.Clock()
625
timeoutAuthServer.transport = FakeTransport(self.portal)
626
timeoutAuthServer.serviceStarted()
627
timeoutAuthServer.clock.advance(11 * 60 * 60)
628
timeoutAuthServer.serviceStopped()
629
self.assertEquals(timeoutAuthServer.transport.packets,
630
[(transport.MSG_DISCONNECT,
632
chr(transport.DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE) +
633
NS("you took too long") + NS(''))])
634
self.assertTrue(timeoutAuthServer.transport.lostConnection)
637
def test_cancelLoginTimeout(self):
639
Test that stopping the service also stops the login timeout.
641
timeoutAuthServer = userauth.SSHUserAuthServer()
642
timeoutAuthServer.clock = task.Clock()
643
timeoutAuthServer.transport = FakeTransport(self.portal)
644
timeoutAuthServer.serviceStarted()
645
timeoutAuthServer.serviceStopped()
646
timeoutAuthServer.clock.advance(11 * 60 * 60)
647
self.assertEquals(timeoutAuthServer.transport.packets, [])
648
self.assertFalse(timeoutAuthServer.transport.lostConnection)
651
def test_tooManyAttempts(self):
653
Test that the server disconnects if the client fails authentication
656
packet = NS('foo') + NS('none') + NS('password') + chr(0) + NS('bar')
657
self.authServer.clock = task.Clock()
659
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
660
self.authServer.clock.advance(2)
662
self.assertEquals(self.authServer.transport.packets[-1],
663
(transport.MSG_DISCONNECT,
665
chr(transport.DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE) +
666
NS("too many bad auths") + NS('')))
667
return d.addCallback(check)
670
def test_failIfUnknownService(self):
672
If the user requests a service that we don't support, the
673
authentication should fail.
675
packet = NS('foo') + NS('') + NS('password') + chr(0) + NS('foo')
676
self.authServer.clock = task.Clock()
677
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
678
return d.addCallback(self._checkFailed)
681
def test__pamConvErrors(self):
683
_pamConv should fail if it gets a message that's not 1 or 2.
685
def secondTest(ignored):
686
d2 = self.authServer._pamConv([('', 90)])
687
return self.assertFailure(d2, ConchError)
689
d = self.authServer._pamConv([('', 3)])
690
return self.assertFailure(d, ConchError).addCallback(secondTest)
693
def test_tryAuthEdgeCases(self):
695
tryAuth() has two edge cases that are difficult to reach.
697
1) an authentication method auth_* returns None instead of a Deferred.
698
2) an authentication type that is defined does not have a matching
701
Both these cases should return a Deferred which fails with a
704
def mockAuth(packet):
707
self.patch(self.authServer, 'auth_publickey', mockAuth) # first case
708
self.patch(self.authServer, 'auth_password', None) # second case
710
def secondTest(ignored):
711
d2 = self.authServer.tryAuth('password', None, None)
712
return self.assertFailure(d2, ConchError)
714
d1 = self.authServer.tryAuth('publickey', None, None)
715
return self.assertFailure(d1, ConchError).addCallback(secondTest)
720
class SSHUserAuthClientTestCase(unittest.TestCase):
722
Tests for SSHUserAuthClient.
727
skip = "cannot run w/o PyCrypto"
731
self.authClient = ClientUserAuth('foo', FakeTransport.Service())
732
self.authClient.transport = FakeTransport(None)
733
self.authClient.transport.sessionID = 'test'
734
self.authClient.serviceStarted()
738
self.authClient.serviceStopped()
739
self.authClient = None
744
Test that client is initialized properly.
746
self.assertEquals(self.authClient.user, 'foo')
747
self.assertEquals(self.authClient.instance.name, 'nancy')
748
self.assertEquals(self.authClient.transport.packets,
749
[(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
753
def test_USERAUTH_SUCCESS(self):
755
Test that the client succeeds properly.
758
def stubSetService(service):
759
instance[0] = service
760
self.authClient.transport.setService = stubSetService
761
self.authClient.ssh_USERAUTH_SUCCESS('')
762
self.assertEquals(instance[0], self.authClient.instance)
765
def test_publickey(self):
767
Test that the client can authenticate with a public key.
769
self.authClient.ssh_USERAUTH_FAILURE(NS('publickey') + '\x00')
770
self.assertEquals(self.authClient.transport.packets[-1],
771
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
772
+ NS('publickey') + '\x00' + NS('ssh-dss')
773
+ NS(keys.Key.fromString(
774
keydata.publicDSA_openssh).blob())))
775
# that key isn't good
776
self.authClient.ssh_USERAUTH_FAILURE(NS('publickey') + '\x00')
777
blob = NS(keys.Key.fromString(keydata.publicRSA_openssh).blob())
778
self.assertEquals(self.authClient.transport.packets[-1],
779
(userauth.MSG_USERAUTH_REQUEST, (NS('foo') + NS('nancy')
780
+ NS('publickey') + '\x00'+ NS('ssh-rsa') + blob)))
781
self.authClient.ssh_USERAUTH_PK_OK(NS('ssh-rsa')
782
+ NS(keys.Key.fromString(keydata.publicRSA_openssh).blob()))
783
sigData = (NS(self.authClient.transport.sessionID)
784
+ chr(userauth.MSG_USERAUTH_REQUEST) + NS('foo')
785
+ NS('nancy') + NS('publickey') + '\xff' + NS('ssh-rsa')
787
obj = keys.Key.fromString(keydata.privateRSA_openssh)
788
self.assertEquals(self.authClient.transport.packets[-1],
789
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
790
+ NS('publickey') + '\xff' + NS('ssh-rsa') + blob
791
+ NS(obj.sign(sigData))))
794
def test_publickey_without_privatekey(self):
796
If the SSHUserAuthClient doesn't return anything from signData,
797
the client should start the authentication over again by requesting
798
'none' authentication.
800
authClient = ClientAuthWithoutPrivateKey('foo',
801
FakeTransport.Service())
803
authClient.transport = FakeTransport(None)
804
authClient.transport.sessionID = 'test'
805
authClient.serviceStarted()
806
authClient.tryAuth('publickey')
807
authClient.transport.packets = []
808
self.assertIdentical(authClient.ssh_USERAUTH_PK_OK(''), None)
809
self.assertEquals(authClient.transport.packets, [
810
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy') +
814
def test_old_publickey_getPublicKey(self):
816
Old SSHUserAuthClients returned strings of public key blobs from
817
getPublicKey(). Test that a Deprecation warning is raised but the key is
820
oldAuth = OldClientAuth('foo', FakeTransport.Service())
821
oldAuth.transport = FakeTransport(None)
822
oldAuth.transport.sessionID = 'test'
823
oldAuth.serviceStarted()
824
oldAuth.transport.packets = []
825
self.assertWarns(DeprecationWarning, "Returning a string from "
826
"SSHUserAuthClient.getPublicKey() is deprecated since "
827
"Twisted 9.0. Return a keys.Key() instead.",
828
userauth.__file__, oldAuth.tryAuth, 'publickey')
829
self.assertEquals(oldAuth.transport.packets, [
830
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy') +
831
NS('publickey') + '\x00' + NS('ssh-rsa') +
832
NS(keys.Key.fromString(keydata.publicRSA_openssh).blob()))])
835
def test_old_publickey_getPrivateKey(self):
837
Old SSHUserAuthClients returned a PyCrypto key object from
838
getPrivateKey(). Test that _cbSignData signs the data warns the
839
user about the deprecation, but signs the data correctly.
841
oldAuth = OldClientAuth('foo', FakeTransport.Service())
842
d = self.assertWarns(DeprecationWarning, "Returning a PyCrypto key "
843
"object from SSHUserAuthClient.getPrivateKey() is "
844
"deprecated since Twisted 9.0. "
845
"Return a keys.Key() instead.", userauth.__file__,
846
oldAuth.signData, None, 'data')
847
def _checkSignedData(sig):
848
self.assertEquals(sig,
849
keys.Key.fromString(keydata.privateRSA_openssh).sign(
851
d.addCallback(_checkSignedData)
855
def test_no_publickey(self):
857
If there's no public key, auth_publickey should return a Deferred
858
called back with a False value.
860
self.authClient.getPublicKey = lambda x: None
861
d = self.authClient.tryAuth('publickey')
863
self.assertFalse(result)
864
return d.addCallback(check)
866
def test_password(self):
868
Test that the client can authentication with a password. This
869
includes changing the password.
871
self.authClient.ssh_USERAUTH_FAILURE(NS('password') + '\x00')
872
self.assertEquals(self.authClient.transport.packets[-1],
873
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
874
+ NS('password') + '\x00' + NS('foo')))
875
self.authClient.ssh_USERAUTH_PK_OK(NS('') + NS(''))
876
self.assertEquals(self.authClient.transport.packets[-1],
877
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
878
+ NS('password') + '\xff' + NS('foo') * 2))
881
def test_no_password(self):
883
If getPassword returns None, tryAuth should return False.
885
self.authClient.getPassword = lambda: None
886
self.assertFalse(self.authClient.tryAuth('password'))
889
def test_keyboardInteractive(self):
891
Test that the client can authenticate using keyboard-interactive
894
self.authClient.ssh_USERAUTH_FAILURE(NS('keyboard-interactive')
896
self.assertEquals(self.authClient.transport.packets[-1],
897
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
898
+ NS('keyboard-interactive') + NS('')*2))
899
self.authClient.ssh_USERAUTH_PK_OK(NS('')*3 + '\x00\x00\x00\x02'
900
+ NS('Name: ') + '\xff' + NS('Password: ') + '\x00')
901
self.assertEquals(self.authClient.transport.packets[-1],
902
(userauth.MSG_USERAUTH_INFO_RESPONSE, '\x00\x00\x00\x02'
906
def test_USERAUTH_PK_OK_unknown_method(self):
908
If C{SSHUserAuthClient} gets a MSG_USERAUTH_PK_OK packet when it's not
909
expecting it, it should fail the current authentication and move on to
912
self.authClient.lastAuth = 'unknown'
913
self.authClient.transport.packets = []
914
self.authClient.ssh_USERAUTH_PK_OK('')
915
self.assertEquals(self.authClient.transport.packets,
916
[(userauth.MSG_USERAUTH_REQUEST, NS('foo') +
917
NS('nancy') + NS('none'))])
920
def test_USERAUTH_FAILURE_sorting(self):
922
ssh_USERAUTH_FAILURE should sort the methods by their position
923
in SSHUserAuthClient.preferredOrder. Methods that are not in
924
preferredOrder should be sorted at the end of that list.
926
def auth_firstmethod():
927
self.authClient.transport.sendPacket(255, 'here is data')
928
def auth_anothermethod():
929
self.authClient.transport.sendPacket(254, 'other data')
931
self.authClient.auth_firstmethod = auth_firstmethod
932
self.authClient.auth_anothermethod = auth_anothermethod
934
# although they shouldn't get called, method callbacks auth_* MUST
935
# exist in order for the test to work properly.
936
self.authClient.ssh_USERAUTH_FAILURE(NS('anothermethod,password') +
938
# should send password packet
939
self.assertEquals(self.authClient.transport.packets[-1],
940
(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
941
+ NS('password') + '\x00' + NS('foo')))
942
self.authClient.ssh_USERAUTH_FAILURE(
943
NS('firstmethod,anothermethod,password') + '\xff')
944
self.assertEquals(self.authClient.transport.packets[-2:],
945
[(255, 'here is data'), (254, 'other data')])
948
def test_disconnectIfNoMoreAuthentication(self):
950
If there are no more available user authentication messages,
951
the SSHUserAuthClient should disconnect with code
952
DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE.
954
self.authClient.ssh_USERAUTH_FAILURE(NS('password') + '\x00')
955
self.authClient.ssh_USERAUTH_FAILURE(NS('password') + '\xff')
956
self.assertEquals(self.authClient.transport.packets[-1],
957
(transport.MSG_DISCONNECT, '\x00\x00\x00\x0e' +
958
NS('no more authentication methods available') +
962
def test_ebAuth(self):
964
_ebAuth (the generic authentication error handler) should send
965
a request for the 'none' authentication method.
967
self.authClient.transport.packets = []
968
self.authClient._ebAuth(None)
969
self.assertEquals(self.authClient.transport.packets,
970
[(userauth.MSG_USERAUTH_REQUEST, NS('foo') + NS('nancy')
974
def test_defaults(self):
976
getPublicKey() should return None. getPrivateKey() should return a
977
failed Deferred. getPassword() should return a failed Deferred.
978
getGenericAnswers() should return a failed Deferred.
980
authClient = userauth.SSHUserAuthClient('foo', FakeTransport.Service())
981
self.assertIdentical(authClient.getPublicKey(), None)
983
result.trap(NotImplementedError)
984
d = authClient.getPassword()
985
return d.addCallback(self.fail).addErrback(check2)
987
result.trap(NotImplementedError)
988
d = authClient.getGenericAnswers(None, None, None)
989
return d.addCallback(self.fail).addErrback(check3)
991
result.trap(NotImplementedError)
992
d = authClient.getPrivateKey()
993
return d.addCallback(self.fail).addErrback(check)
997
class LoopbackTestCase(unittest.TestCase):
1001
skip = "cannot run w/o PyCrypto or PyASN1"
1006
name = 'TestService'
1009
def serviceStarted(self):
1010
self.transport.loseConnection()
1013
def serviceStopped(self):
1017
def getService(self, avatar, name):
1021
def test_loopback(self):
1023
Test that the userauth server and client play nicely with each other.
1025
server = userauth.SSHUserAuthServer()
1026
client = ClientUserAuth('foo', self.Factory.Service())
1029
server.transport = transport.SSHTransportBase()
1030
server.transport.service = server
1031
server.transport.isEncrypted = lambda x: True
1032
client.transport = transport.SSHTransportBase()
1033
client.transport.service = client
1034
server.transport.sessionID = client.transport.sessionID = ''
1035
# don't send key exchange packet
1036
server.transport.sendKexInit = client.transport.sendKexInit = \
1039
# set up server authentication
1040
server.transport.factory = self.Factory()
1041
server.passwordDelay = 0 # remove bad password delay
1043
portal = Portal(realm)
1044
checker = SSHProtocolChecker()
1045
checker.registerChecker(PasswordChecker())
1046
checker.registerChecker(PrivateKeyChecker())
1047
checker.registerChecker(PAMChecker())
1048
checker.areDone = lambda aId: (
1049
len(checker.successfulCredentials[aId]) == 3)
1050
portal.registerChecker(checker)
1051
server.transport.factory.portal = portal
1053
d = loopback.loopbackAsync(server.transport, client.transport)
1054
server.transport.transport.logPrefix = lambda: '_ServerLoopback'
1055
client.transport.transport.logPrefix = lambda: '_ClientLoopback'
1057
server.serviceStarted()
1058
client.serviceStarted()
1061
self.assertEquals(server.transport.service.name, 'TestService')
1062
return d.addCallback(check)