~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/conch/test/test_default.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.conch.client.default}.
 
6
"""
 
7
try:
 
8
    import Crypto.Cipher.DES3
 
9
    import pyasn1
 
10
except ImportError:
 
11
    skip = "PyCrypto and PyASN1 required for twisted.conch.client.default."
 
12
else:
 
13
    from twisted.conch.client.agent import SSHAgentClient
 
14
    from twisted.conch.client.default import SSHUserAuthClient
 
15
    from twisted.conch.client.options import ConchOptions
 
16
    from twisted.conch.ssh.keys import Key
 
17
 
 
18
 
 
19
from twisted.trial.unittest import TestCase
 
20
from twisted.python.filepath import FilePath
 
21
from twisted.conch.test import keydata
 
22
from twisted.test.proto_helpers import StringTransport
 
23
 
 
24
 
 
25
 
 
26
class SSHUserAuthClientTest(TestCase):
 
27
    """
 
28
    Tests for L{SSHUserAuthClient}.
 
29
 
 
30
    @type rsaPublic: L{Key}
 
31
    @ivar rsaPublic: A public RSA key.
 
32
    """
 
33
 
 
34
    def setUp(self):
 
35
        self.rsaPublic = Key.fromString(keydata.publicRSA_openssh)
 
36
        self.tmpdir = FilePath(self.mktemp())
 
37
        self.tmpdir.makedirs()
 
38
        self.rsaFile = self.tmpdir.child('id_rsa')
 
39
        self.rsaFile.setContent(keydata.privateRSA_openssh)
 
40
        self.tmpdir.child('id_rsa.pub').setContent(keydata.publicRSA_openssh)
 
41
 
 
42
 
 
43
    def test_signDataWithAgent(self):
 
44
        """
 
45
        When connected to an agent, L{SSHUserAuthClient} can use it to
 
46
        request signatures of particular data with a particular L{Key}.
 
47
        """
 
48
        client = SSHUserAuthClient("user", ConchOptions(), None)
 
49
        agent = SSHAgentClient()
 
50
        transport = StringTransport()
 
51
        agent.makeConnection(transport)
 
52
        client.keyAgent = agent
 
53
        cleartext = "Sign here"
 
54
        client.signData(self.rsaPublic, cleartext)
 
55
        self.assertEquals(
 
56
            transport.value(),
 
57
            "\x00\x00\x00\x8b\r\x00\x00\x00u" + self.rsaPublic.blob() +
 
58
            "\x00\x00\x00\t" + cleartext +
 
59
            "\x00\x00\x00\x00")
 
60
 
 
61
 
 
62
    def test_agentGetPublicKey(self):
 
63
        """
 
64
        L{SSHUserAuthClient} looks up public keys from the agent using the
 
65
        L{SSHAgentClient} class.  That L{SSHAgentClient.getPublicKey} returns a
 
66
        L{Key} object with one of the public keys in the agent.  If no more
 
67
        keys are present, it returns C{None}.
 
68
        """
 
69
        agent = SSHAgentClient()
 
70
        agent.blobs = [self.rsaPublic.blob()]
 
71
        key = agent.getPublicKey()
 
72
        self.assertEquals(key.isPublic(), True)
 
73
        self.assertEquals(key, self.rsaPublic)
 
74
        self.assertEquals(agent.getPublicKey(), None)
 
75
 
 
76
 
 
77
    def test_getPublicKeyFromFile(self):
 
78
        """
 
79
        L{SSHUserAuthClient.getPublicKey()} is able to get a public key from
 
80
        the first file described by its options' C{identitys} list, and return
 
81
        the corresponding public L{Key} object.
 
82
        """
 
83
        options = ConchOptions()
 
84
        options.identitys = [self.rsaFile.path]
 
85
        client = SSHUserAuthClient("user",  options, None)
 
86
        key = client.getPublicKey()
 
87
        self.assertEquals(key.isPublic(), True)
 
88
        self.assertEquals(key, self.rsaPublic)
 
89
 
 
90
 
 
91
    def test_getPublicKeyAgentFallback(self):
 
92
        """
 
93
        If an agent is present, but doesn't return a key,
 
94
        L{SSHUserAuthClient.getPublicKey} continue with the normal key lookup.
 
95
        """
 
96
        options = ConchOptions()
 
97
        options.identitys = [self.rsaFile.path]
 
98
        agent = SSHAgentClient()
 
99
        client = SSHUserAuthClient("user",  options, None)
 
100
        client.keyAgent = agent
 
101
        key = client.getPublicKey()
 
102
        self.assertEquals(key.isPublic(), True)
 
103
        self.assertEquals(key, self.rsaPublic)
 
104
 
 
105
 
 
106
    def test_getPublicKeyBadKeyError(self):
 
107
        """
 
108
        If L{keys.Key.fromFile} raises a L{keys.BadKeyError}, the
 
109
        L{SSHUserAuthClient.getPublicKey} tries again to get a public key by
 
110
        calling itself recursively.
 
111
        """
 
112
        options = ConchOptions()
 
113
        self.tmpdir.child('id_dsa.pub').setContent(keydata.publicDSA_openssh)
 
114
        dsaFile = self.tmpdir.child('id_dsa')
 
115
        dsaFile.setContent(keydata.privateDSA_openssh)
 
116
        options.identitys = [self.rsaFile.path, dsaFile.path]
 
117
        self.tmpdir.child('id_rsa.pub').setContent('not a key!')
 
118
        client = SSHUserAuthClient("user",  options, None)
 
119
        key = client.getPublicKey()
 
120
        self.assertEquals(key.isPublic(), True)
 
121
        self.assertEquals(key, Key.fromString(keydata.publicDSA_openssh))
 
122
        self.assertEquals(client.usedFiles, [self.rsaFile.path, dsaFile.path])
 
123
 
 
124
 
 
125
    def test_getPrivateKey(self):
 
126
        """
 
127
        L{SSHUserAuthClient.getPrivateKey} will load a private key from the
 
128
        last used file populated by L{SSHUserAuthClient.getPublicKey}, and
 
129
        return a L{Deferred} which fires with the corresponding private L{Key}.
 
130
        """
 
131
        rsaPrivate = Key.fromString(keydata.privateRSA_openssh)
 
132
        options = ConchOptions()
 
133
        options.identitys = [self.rsaFile.path]
 
134
        client = SSHUserAuthClient("user",  options, None)
 
135
        # Populate the list of used files
 
136
        client.getPublicKey()
 
137
 
 
138
        def _cbGetPrivateKey(key):
 
139
            self.assertEquals(key.isPublic(), False)
 
140
            self.assertEquals(key, rsaPrivate)
 
141
 
 
142
        return client.getPrivateKey().addCallback(_cbGetPrivateKey)
 
143
 
 
144
 
 
145
    def test_getPrivateKeyPassphrase(self):
 
146
        """
 
147
        L{SSHUserAuthClient} can get a private key from a file, and return a
 
148
        Deferred called back with a private L{Key} object, even if the key is
 
149
        encrypted.
 
150
        """
 
151
        rsaPrivate = Key.fromString(keydata.privateRSA_openssh)
 
152
        passphrase = 'this is the passphrase'
 
153
        self.rsaFile.setContent(rsaPrivate.toString('openssh', passphrase))
 
154
        options = ConchOptions()
 
155
        options.identitys = [self.rsaFile.path]
 
156
        client = SSHUserAuthClient("user",  options, None)
 
157
        # Populate the list of used files
 
158
        client.getPublicKey()
 
159
 
 
160
        def _getPassword(prompt):
 
161
            self.assertEquals(prompt,
 
162
                              "Enter passphrase for key '%s': " % (
 
163
                              self.rsaFile.path,))
 
164
            return passphrase
 
165
 
 
166
        def _cbGetPrivateKey(key):
 
167
            self.assertEquals(key.isPublic(), False)
 
168
            self.assertEquals(key, rsaPrivate)
 
169
 
 
170
        self.patch(client, '_getPassword', _getPassword)
 
171
        return client.getPrivateKey().addCallback(_cbGetPrivateKey)