~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/conch/test/test_knownhosts.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2008 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.conch.client.knownhosts}.
 
6
"""
 
7
 
 
8
import os
 
9
from binascii import Error as BinasciiError, b2a_base64, a2b_base64
 
10
 
 
11
try:
 
12
    import Crypto
 
13
    import pyasn1
 
14
except ImportError:
 
15
    skip = "PyCrypto and PyASN1 required for twisted.conch.knownhosts."
 
16
else:
 
17
    from twisted.conch.ssh.keys import Key, BadKeyError
 
18
    from twisted.conch.client.knownhosts import \
 
19
        PlainEntry, HashedEntry, KnownHostsFile, UnparsedEntry, ConsoleUI
 
20
    from twisted.conch.client import default
 
21
 
 
22
from zope.interface.verify import verifyObject
 
23
 
 
24
from twisted.python.filepath import FilePath
 
25
from twisted.trial.unittest import TestCase
 
26
from twisted.internet.defer import Deferred
 
27
from twisted.conch.interfaces import IKnownHostEntry
 
28
from twisted.conch.error import HostKeyChanged, UserRejectedKey, InvalidEntry
 
29
 
 
30
 
 
31
sampleEncodedKey = (
 
32
    'AAAAB3NzaC1yc2EAAAABIwAAAQEAsV0VMRbGmzhqxxayLRHmvnFvtyNqgbNKV46dU1bVFB+3y'
 
33
    'tNvue4Riqv/SVkPRNwMb7eWH29SviXaBxUhYyzKkDoNUq3rTNnH1Vnif6d6X4JCrUb5d3W+Dm'
 
34
    'YClyJrZ5HgD/hUpdSkTRqdbQ2TrvSAxRacj+vHHT4F4dm1bJSewm3B2D8HVOoi/CbVh3dsIiC'
 
35
    'dp8VltdZx4qYVfYe2LwVINCbAa3d3tj9ma7RVfw3OH2Mfb+toLd1N5tBQFb7oqTt2nC6I/6Bd'
 
36
    '4JwPUld+IEitw/suElq/AIJVQXXujeyiZlea90HE65U2mF1ytr17HTAIT2ySokJWyuBANGACk'
 
37
    '6iIaw==')
 
38
 
 
39
otherSampleEncodedKey = (
 
40
    'AAAAB3NzaC1yc2EAAAABIwAAAIEAwaeCZd3UCuPXhX39+/p9qO028jTF76DMVd9mPvYVDVXuf'
 
41
    'WckKZauF7+0b7qm+ChT7kan6BzRVo4++gCVNfAlMzLysSt3ylmOR48tFpAfygg9UCX3DjHz0E'
 
42
    'lOOUKh3iifc9aUShD0OPaK3pR5JJ8jfiBfzSYWt/hDi/iZ4igsSs8=')
 
43
 
 
44
thirdSampleEncodedKey = (
 
45
    'AAAAB3NzaC1yc2EAAAABIwAAAQEAl/TQakPkePlnwCBRPitIVUTg6Z8VzN1en+DGkyo/evkmLw'
 
46
    '7o4NWR5qbysk9A9jXW332nxnEuAnbcCam9SHe1su1liVfyIK0+3bdn0YRB0sXIbNEtMs2LtCho'
 
47
    '/aV3cXPS+Cf1yut3wvIpaRnAzXxuKPCTXQ7/y0IXa8TwkRBH58OJa3RqfQ/NsSp5SAfdsrHyH2'
 
48
    'aitiVKm2jfbTKzSEqOQG/zq4J9GXTkq61gZugory/Tvl5/yPgSnOR6C9jVOMHf27ZPoRtyj9SY'
 
49
    '343Hd2QHiIE0KPZJEgCynKeWoKz8v6eTSK8n4rBnaqWdp8MnGZK1WGy05MguXbyCDuTC8AmJXQ'
 
50
    '==')
 
51
 
 
52
sampleKey = a2b_base64(sampleEncodedKey)
 
53
otherSampleKey = a2b_base64(otherSampleEncodedKey)
 
54
thirdSampleKey = a2b_base64(thirdSampleEncodedKey)
 
55
 
 
56
samplePlaintextLine = (
 
57
    "www.twistedmatrix.com ssh-rsa " + sampleEncodedKey + "\n")
 
58
 
 
59
otherSamplePlaintextLine = (
 
60
    "divmod.com ssh-rsa " + otherSampleEncodedKey + "\n")
 
61
 
 
62
sampleHostIPLine = (
 
63
    "www.twistedmatrix.com,198.49.126.131 ssh-rsa " + sampleEncodedKey + "\n")
 
64
 
 
65
sampleHashedLine = (
 
66
    "|1|gJbSEPBG9ZSBoZpHNtZBD1bHKBA=|bQv+0Xa0dByrwkA1EB0E7Xop/Fo= ssh-rsa " +
 
67
    sampleEncodedKey + "\n")
 
68
 
 
69
 
 
70
 
 
71
class EntryTestsMixin:
 
72
    """
 
73
    Tests for implementations of L{IKnownHostEntry}.  Subclasses must set the
 
74
    'entry' attribute to a provider of that interface, the implementation of
 
75
    that interface under test.
 
76
 
 
77
    @ivar entry: a provider of L{IKnownHostEntry} with a hostname of
 
78
    www.twistedmatrix.com and an RSA key of sampleKey.
 
79
    """
 
80
 
 
81
    def test_providesInterface(self):
 
82
        """
 
83
        The given entry should provide IKnownHostEntry.
 
84
        """
 
85
        verifyObject(IKnownHostEntry, self.entry)
 
86
 
 
87
 
 
88
    def test_fromString(self):
 
89
        """
 
90
        Constructing a plain text entry from an unhashed known_hosts entry will
 
91
        result in an L{IKnownHostEntry} provider with 'keyString', 'hostname',
 
92
        and 'keyType' attributes.  While outside the interface in question,
 
93
        these attributes are held in common by L{PlainEntry} and L{HashedEntry}
 
94
        implementations; other implementations should override this method in
 
95
        subclasses.
 
96
        """
 
97
        entry = self.entry
 
98
        self.assertEqual(entry.publicKey, Key.fromString(sampleKey))
 
99
        self.assertEqual(entry.keyType, "ssh-rsa")
 
100
 
 
101
 
 
102
    def test_matchesKey(self):
 
103
        """
 
104
        L{IKnownHostEntry.matchesKey} checks to see if an entry matches a given
 
105
        SSH key.
 
106
        """
 
107
        twistedmatrixDotCom = Key.fromString(sampleKey)
 
108
        divmodDotCom = Key.fromString(otherSampleKey)
 
109
        self.assertEqual(
 
110
            True,
 
111
            self.entry.matchesKey(twistedmatrixDotCom))
 
112
        self.assertEqual(
 
113
            False,
 
114
            self.entry.matchesKey(divmodDotCom))
 
115
 
 
116
 
 
117
    def test_matchesHost(self):
 
118
        """
 
119
        L{IKnownHostEntry.matchesHost} checks to see if an entry matches a
 
120
        given hostname.
 
121
        """
 
122
        self.assertEqual(True, self.entry.matchesHost(
 
123
                "www.twistedmatrix.com"))
 
124
        self.assertEqual(False, self.entry.matchesHost(
 
125
                "www.divmod.com"))
 
126
 
 
127
 
 
128
 
 
129
class PlainEntryTests(EntryTestsMixin, TestCase):
 
130
    """
 
131
    Test cases for L{PlainEntry}.
 
132
    """
 
133
    plaintextLine = samplePlaintextLine
 
134
    hostIPLine = sampleHostIPLine
 
135
 
 
136
    def setUp(self):
 
137
        """
 
138
        Set 'entry' to a sample plain-text entry with sampleKey as its key.
 
139
        """
 
140
        self.entry = PlainEntry.fromString(self.plaintextLine)
 
141
 
 
142
 
 
143
    def test_matchesHostIP(self):
 
144
        """
 
145
        A "hostname,ip" formatted line will match both the host and the IP.
 
146
        """
 
147
        self.entry = PlainEntry.fromString(self.hostIPLine)
 
148
        self.assertEqual(True, self.entry.matchesHost("198.49.126.131"))
 
149
        self.test_matchesHost()
 
150
 
 
151
 
 
152
    def test_toString(self):
 
153
        """
 
154
        L{PlainEntry.toString} generates the serialized OpenSSL format string
 
155
        for the entry, sans newline.
 
156
        """
 
157
        self.assertEqual(self.entry.toString(), self.plaintextLine.rstrip("\n"))
 
158
        multiHostEntry = PlainEntry.fromString(self.hostIPLine)
 
159
        self.assertEqual(multiHostEntry.toString(), self.hostIPLine.rstrip("\n"))
 
160
 
 
161
 
 
162
 
 
163
class PlainTextWithCommentTests(PlainEntryTests):
 
164
    """
 
165
    Test cases for L{PlainEntry} when parsed from a line with a comment.
 
166
    """
 
167
 
 
168
    plaintextLine = samplePlaintextLine[:-1] + " plain text comment.\n"
 
169
    hostIPLine = sampleHostIPLine[:-1] + " text following host/IP line\n"
 
170
 
 
171
 
 
172
 
 
173
class HashedEntryTests(EntryTestsMixin, TestCase):
 
174
    """
 
175
    Tests for L{HashedEntry}.
 
176
 
 
177
    This suite doesn't include any tests for host/IP pairs because hashed
 
178
    entries store IP addresses the same way as hostnames and does not support
 
179
    comma-separated lists.  (If you hash the IP and host together you can't
 
180
    tell if you've got the key already for one or the other.)
 
181
    """
 
182
    hashedLine = sampleHashedLine
 
183
 
 
184
    def setUp(self):
 
185
        """
 
186
        Set 'entry' to a sample hashed entry for twistedmatrix.com with
 
187
        sampleKey as its key.
 
188
        """
 
189
        self.entry = HashedEntry.fromString(self.hashedLine)
 
190
 
 
191
 
 
192
    def test_toString(self):
 
193
        """
 
194
        L{HashedEntry.toString} generates the serialized OpenSSL format string
 
195
        for the entry, sans the newline.
 
196
        """
 
197
        self.assertEqual(self.entry.toString(), self.hashedLine.rstrip("\n"))
 
198
 
 
199
 
 
200
 
 
201
class HashedEntryWithCommentTests(HashedEntryTests):
 
202
    """
 
203
    Test cases for L{PlainEntry} when parsed from a line with a comment.
 
204
    """
 
205
 
 
206
    hashedLine = sampleHashedLine[:-1] + " plain text comment.\n"
 
207
 
 
208
 
 
209
 
 
210
class UnparsedEntryTests(TestCase, EntryTestsMixin):
 
211
    """
 
212
    Tests for L{UnparsedEntry}
 
213
    """
 
214
    def setUp(self):
 
215
        """
 
216
        Set up the 'entry' to be an unparsed entry for some random text.
 
217
        """
 
218
        self.entry = UnparsedEntry("    This is a bogus entry.  \n")
 
219
 
 
220
 
 
221
    def test_fromString(self):
 
222
        """
 
223
        Creating an L{UnparsedEntry} should simply record the string it was
 
224
        passed.
 
225
        """
 
226
        self.assertEqual("    This is a bogus entry.  \n",
 
227
                         self.entry._string)
 
228
 
 
229
 
 
230
    def test_matchesHost(self):
 
231
        """
 
232
        An unparsed entry can't match any hosts.
 
233
        """
 
234
        self.assertEqual(False, self.entry.matchesHost("www.twistedmatrix.com"))
 
235
 
 
236
 
 
237
    def test_matchesKey(self):
 
238
        """
 
239
        An unparsed entry can't match any keys.
 
240
        """
 
241
        self.assertEqual(False, self.entry.matchesKey(Key.fromString(sampleKey)))
 
242
 
 
243
 
 
244
    def test_toString(self):
 
245
        """
 
246
        L{UnparsedEntry.toString} returns its input string, sans trailing newline.
 
247
        """
 
248
        self.assertEqual("    This is a bogus entry.  ", self.entry.toString())
 
249
 
 
250
 
 
251
 
 
252
class ParseErrorTests(TestCase):
 
253
    """
 
254
    L{HashedEntry.fromString} and L{PlainEntry.fromString} can raise a variety
 
255
    of errors depending on misformattings of certain strings.  These tests make
 
256
    sure those errors are caught.  Since many of the ways that this can go
 
257
    wrong are in the lower-level APIs being invoked by the parsing logic,
 
258
    several of these are integration tests with the L{base64} and
 
259
    L{twisted.conch.ssh.keys} modules.
 
260
    """
 
261
 
 
262
    def invalidEntryTest(self, cls):
 
263
        """
 
264
        If there are fewer than three elements, C{fromString} should raise
 
265
        L{InvalidEntry}.
 
266
        """
 
267
        self.assertRaises(InvalidEntry, cls.fromString, "invalid")
 
268
 
 
269
 
 
270
    def notBase64Test(self, cls):
 
271
        """
 
272
        If the key is not base64, C{fromString} should raise L{BinasciiError}.
 
273
        """
 
274
        self.assertRaises(BinasciiError, cls.fromString, "x x x")
 
275
 
 
276
 
 
277
    def badKeyTest(self, cls, prefix):
 
278
        """
 
279
        If the key portion of the entry is valid base64, but is not actually an
 
280
        SSH key, C{fromString} should raise L{BadKeyError}.
 
281
        """
 
282
        self.assertRaises(BadKeyError, cls.fromString, ' '.join(
 
283
                [prefix, "ssh-rsa", b2a_base64(
 
284
                        "Hey, this isn't an SSH key!").strip()]))
 
285
 
 
286
 
 
287
    def test_invalidPlainEntry(self):
 
288
        """
 
289
        If there are fewer than three whitespace-separated elements in an
 
290
        entry, L{PlainEntry.fromString} should raise L{InvalidEntry}.
 
291
        """
 
292
        self.invalidEntryTest(PlainEntry)
 
293
 
 
294
 
 
295
    def test_invalidHashedEntry(self):
 
296
        """
 
297
        If there are fewer than three whitespace-separated elements in an
 
298
        entry, or the hostname salt/hash portion has more than two elements,
 
299
        L{HashedEntry.fromString} should raise L{InvalidEntry}.
 
300
        """
 
301
        self.invalidEntryTest(HashedEntry)
 
302
        a, b, c = sampleHashedLine.split()
 
303
        self.assertRaises(InvalidEntry, HashedEntry.fromString, ' '.join(
 
304
                [a + "||", b, c]))
 
305
 
 
306
 
 
307
    def test_plainNotBase64(self):
 
308
        """
 
309
        If the key portion of a plain entry is not decodable as base64,
 
310
        C{fromString} should raise L{BinasciiError}.
 
311
        """
 
312
        self.notBase64Test(PlainEntry)
 
313
 
 
314
 
 
315
    def test_hashedNotBase64(self):
 
316
        """
 
317
        If the key, host salt, or host hash portion of a hashed entry is not
 
318
        encoded, it will raise L{BinasciiError}.
 
319
        """
 
320
        self.notBase64Test(HashedEntry)
 
321
        a, b, c = sampleHashedLine.split()
 
322
        # Salt not valid base64.
 
323
        self.assertRaises(
 
324
            BinasciiError, HashedEntry.fromString,
 
325
            ' '.join(["|1|x|" + b2a_base64("stuff").strip(), b, c]))
 
326
        # Host hash not valid base64.
 
327
        self.assertRaises(
 
328
            BinasciiError, HashedEntry.fromString,
 
329
            ' '.join([HashedEntry.MAGIC + b2a_base64("stuff").strip() + "|x", b, c]))
 
330
        # Neither salt nor hash valid base64.
 
331
        self.assertRaises(
 
332
            BinasciiError, HashedEntry.fromString,
 
333
            ' '.join(["|1|x|x", b, c]))
 
334
 
 
335
 
 
336
    def test_hashedBadKey(self):
 
337
        """
 
338
        If the key portion of the entry is valid base64, but is not actually an
 
339
        SSH key, C{HashedEntry.fromString} should raise L{BadKeyError}.
 
340
        """
 
341
        a, b, c = sampleHashedLine.split()
 
342
        self.badKeyTest(HashedEntry, a)
 
343
 
 
344
 
 
345
    def test_plainBadKey(self):
 
346
        """
 
347
        If the key portion of the entry is valid base64, but is not actually an
 
348
        SSH key, C{PlainEntry.fromString} should raise L{BadKeyError}.
 
349
        """
 
350
        self.badKeyTest(PlainEntry, "hostname")
 
351
 
 
352
 
 
353
 
 
354
class KnownHostsDatabaseTests(TestCase):
 
355
    """
 
356
    Tests for L{KnownHostsFile}.
 
357
    """
 
358
 
 
359
    def pathWithContent(self, content):
 
360
        """
 
361
        Return a FilePath with the given initial content.
 
362
        """
 
363
        fp = FilePath(self.mktemp())
 
364
        fp.setContent(content)
 
365
        return fp
 
366
 
 
367
 
 
368
    def loadSampleHostsFile(self, content=(
 
369
            sampleHashedLine + otherSamplePlaintextLine +
 
370
            "\n# That was a blank line.\n"
 
371
            "This is just unparseable.\n"
 
372
            "This also unparseable.\n")):
 
373
        """
 
374
        Return a sample hosts file, with keys for www.twistedmatrix.com and
 
375
        divmod.com present.
 
376
        """
 
377
        return KnownHostsFile.fromPath(self.pathWithContent(content))
 
378
 
 
379
 
 
380
    def test_loadFromPath(self):
 
381
        """
 
382
        Loading a L{KnownHostsFile} from a path with six entries in it will
 
383
        result in a L{KnownHostsFile} object with six L{IKnownHostEntry}
 
384
        providers in it, each of the appropriate type.
 
385
        """
 
386
        hostsFile = self.loadSampleHostsFile()
 
387
        self.assertEqual(len(hostsFile._entries), 6)
 
388
        self.assertIsInstance(hostsFile._entries[0], HashedEntry)
 
389
        self.assertEqual(True, hostsFile._entries[0].matchesHost(
 
390
                "www.twistedmatrix.com"))
 
391
        self.assertIsInstance(hostsFile._entries[1], PlainEntry)
 
392
        self.assertEqual(True, hostsFile._entries[1].matchesHost(
 
393
                "divmod.com"))
 
394
        self.assertIsInstance(hostsFile._entries[2], UnparsedEntry)
 
395
        self.assertEqual(hostsFile._entries[2].toString(), "")
 
396
        self.assertIsInstance(hostsFile._entries[3], UnparsedEntry)
 
397
        self.assertEqual(hostsFile._entries[3].toString(),
 
398
                         "# That was a blank line.")
 
399
        self.assertIsInstance(hostsFile._entries[4], UnparsedEntry)
 
400
        self.assertEqual(hostsFile._entries[4].toString(),
 
401
                         "This is just unparseable.")
 
402
        self.assertIsInstance(hostsFile._entries[5], UnparsedEntry)
 
403
        self.assertEqual(hostsFile._entries[5].toString(),
 
404
                         "This also unparseable.")
 
405
 
 
406
 
 
407
    def test_loadNonExistent(self):
 
408
        """
 
409
        Loading a L{KnownHostsFile} from a path that does not exist should
 
410
        result in an empty L{KnownHostsFile} that will save back to that path.
 
411
        """
 
412
        pn = self.mktemp()
 
413
        knownHostsFile = KnownHostsFile.fromPath(FilePath(pn))
 
414
        self.assertEqual([], list(knownHostsFile._entries))
 
415
        self.assertEqual(False, FilePath(pn).exists())
 
416
        knownHostsFile.save()
 
417
        self.assertEqual(True, FilePath(pn).exists())
 
418
 
 
419
 
 
420
    def test_loadNonExistentParent(self):
 
421
        """
 
422
        Loading a L{KnownHostsFile} from a path whose parent directory does not
 
423
        exist should result in an empty L{KnownHostsFile} that will save back
 
424
        to that path, creating its parent directory(ies) in the process.
 
425
        """
 
426
        thePath = FilePath(self.mktemp())
 
427
        knownHostsPath = thePath.child("foo").child("known_hosts")
 
428
        knownHostsFile = KnownHostsFile.fromPath(knownHostsPath)
 
429
        knownHostsFile.save()
 
430
        knownHostsPath.restat(False)
 
431
        self.assertEqual(True, knownHostsPath.exists())
 
432
 
 
433
 
 
434
    def test_savingAddsEntry(self):
 
435
        """
 
436
        L{KnownHostsFile.save()} will write out a new file with any entries
 
437
        that have been added.
 
438
        """
 
439
        path = self.pathWithContent(sampleHashedLine +
 
440
                                    otherSamplePlaintextLine)
 
441
        knownHostsFile = KnownHostsFile.fromPath(path)
 
442
        newEntry = knownHostsFile.addHostKey("some.example.com", Key.fromString(thirdSampleKey))
 
443
        expectedContent = (
 
444
            sampleHashedLine +
 
445
            otherSamplePlaintextLine + HashedEntry.MAGIC +
 
446
            b2a_base64(newEntry._hostSalt).strip() + "|" +
 
447
            b2a_base64(newEntry._hostHash).strip() + " ssh-rsa " +
 
448
            thirdSampleEncodedKey + "\n")
 
449
 
 
450
        # Sanity check, let's make sure the base64 API being used for the test
 
451
        # isn't inserting spurious newlines.
 
452
        self.assertEqual(3, expectedContent.count("\n"))
 
453
        knownHostsFile.save()
 
454
        self.assertEqual(expectedContent, path.getContent())
 
455
 
 
456
 
 
457
    def test_hasPresentKey(self):
 
458
        """
 
459
        L{KnownHostsFile.hasHostKey} returns C{True} when a key for the given
 
460
        hostname is present and matches the expected key.
 
461
        """
 
462
        hostsFile = self.loadSampleHostsFile()
 
463
        self.assertEqual(True, hostsFile.hasHostKey(
 
464
                "www.twistedmatrix.com", Key.fromString(sampleKey)))
 
465
 
 
466
 
 
467
    def test_hasNonPresentKey(self):
 
468
        """
 
469
        L{KnownHostsFile.hasHostKey} returns C{False} when a key for the given
 
470
        hostname is not present.
 
471
        """
 
472
        hostsFile = self.loadSampleHostsFile()
 
473
        self.assertEqual(False, hostsFile.hasHostKey(
 
474
                "non-existent.example.com", Key.fromString(sampleKey)))
 
475
 
 
476
 
 
477
    def test_hasKeyMismatch(self):
 
478
        """
 
479
        L{KnownHostsFile.hasHostKey} raises L{HostKeyChanged} if the host key
 
480
        is present, but different from the expected one.  The resulting
 
481
        exception should have an offendingEntry indicating the given entry.
 
482
        """
 
483
        hostsFile = self.loadSampleHostsFile()
 
484
        exception = self.assertRaises(
 
485
            HostKeyChanged, hostsFile.hasHostKey,
 
486
            "www.twistedmatrix.com", Key.fromString(otherSampleKey))
 
487
        self.assertEqual(exception.offendingEntry, hostsFile._entries[0])
 
488
        self.assertEqual(exception.lineno, 1)
 
489
        self.assertEqual(exception.path, hostsFile._savePath)
 
490
 
 
491
 
 
492
    def test_addHostKey(self):
 
493
        """
 
494
        L{KnownHostsFile.addHostKey} adds a new L{HashedEntry} to the host
 
495
        file, and returns it.
 
496
        """
 
497
        hostsFile = self.loadSampleHostsFile()
 
498
        aKey = Key.fromString(thirdSampleKey)
 
499
        self.assertEqual(False,
 
500
                         hostsFile.hasHostKey("somewhere.example.com", aKey))
 
501
        newEntry = hostsFile.addHostKey("somewhere.example.com", aKey)
 
502
 
 
503
        # The code in OpenSSH requires host salts to be 20 characters long.
 
504
        # This is the required length of a SHA-1 HMAC hash, so it's just a
 
505
        # sanity check.
 
506
        self.assertEqual(20, len(newEntry._hostSalt))
 
507
        self.assertEqual(True,
 
508
                         newEntry.matchesHost("somewhere.example.com"))
 
509
        self.assertEqual(newEntry.keyType, "ssh-rsa")
 
510
        self.assertEqual(aKey, newEntry.publicKey)
 
511
        self.assertEqual(True,
 
512
                         hostsFile.hasHostKey("somewhere.example.com", aKey))
 
513
 
 
514
 
 
515
    def test_randomSalts(self):
 
516
        """
 
517
        L{KnownHostsFile.addHostKey} generates a random salt for each new key,
 
518
        so subsequent salts will be different.
 
519
        """
 
520
        hostsFile = self.loadSampleHostsFile()
 
521
        aKey = Key.fromString(thirdSampleKey)
 
522
        self.assertNotEqual(
 
523
            hostsFile.addHostKey("somewhere.example.com", aKey)._hostSalt,
 
524
            hostsFile.addHostKey("somewhere-else.example.com", aKey)._hostSalt)
 
525
 
 
526
 
 
527
    def test_verifyValidKey(self):
 
528
        """
 
529
        Verifying a valid key should return a L{Deferred} which fires with
 
530
        True.
 
531
        """
 
532
        hostsFile = self.loadSampleHostsFile()
 
533
        hostsFile.addHostKey("1.2.3.4", Key.fromString(sampleKey))
 
534
        ui = FakeUI()
 
535
        d = hostsFile.verifyHostKey(ui, "www.twistedmatrix.com", "1.2.3.4",
 
536
                                    Key.fromString(sampleKey))
 
537
        l = []
 
538
        d.addCallback(l.append)
 
539
        self.assertEqual(l, [True])
 
540
 
 
541
 
 
542
    def test_verifyInvalidKey(self):
 
543
        """
 
544
        Verfying an invalid key should return a L{Deferred} which fires with a
 
545
        L{HostKeyChanged} failure.
 
546
        """
 
547
        hostsFile = self.loadSampleHostsFile()
 
548
        wrongKey = Key.fromString(thirdSampleKey)
 
549
        ui = FakeUI()
 
550
        hostsFile.addHostKey("1.2.3.4", Key.fromString(sampleKey))
 
551
        d = hostsFile.verifyHostKey(
 
552
            ui, "www.twistedmatrix.com", "1.2.3.4", wrongKey)
 
553
        return self.assertFailure(d, HostKeyChanged)
 
554
 
 
555
 
 
556
    def verifyNonPresentKey(self):
 
557
        """
 
558
        Set up a test to verify a key that isn't present.  Return a 3-tuple of
 
559
        the UI, a list set up to collect the result of the verifyHostKey call,
 
560
        and the sample L{KnownHostsFile} being used.
 
561
 
 
562
        This utility method avoids returning a L{Deferred}, and records results
 
563
        in the returned list instead, because the events which get generated
 
564
        here are pre-recorded in the 'ui' object.  If the L{Deferred} in
 
565
        question does not fire, the it will fail quickly with an empty list.
 
566
        """
 
567
        hostsFile = self.loadSampleHostsFile()
 
568
        absentKey = Key.fromString(thirdSampleKey)
 
569
        ui = FakeUI()
 
570
        l = []
 
571
        d = hostsFile.verifyHostKey(
 
572
            ui, "sample-host.example.com", "4.3.2.1", absentKey)
 
573
        d.addBoth(l.append)
 
574
        self.assertEqual([], l)
 
575
        self.assertEqual(
 
576
            ui.promptText,
 
577
            "The authenticity of host 'sample-host.example.com (4.3.2.1)' "
 
578
            "can't be established.\n"
 
579
            "RSA key fingerprint is "
 
580
            "89:4e:cc:8c:57:83:96:48:ef:63:ad:ee:99:00:4c:8f.\n"
 
581
            "Are you sure you want to continue connecting (yes/no)? ")
 
582
        return ui, l, hostsFile
 
583
 
 
584
 
 
585
    def test_verifyNonPresentKey_Yes(self):
 
586
        """
 
587
        Verifying a key where neither the hostname nor the IP are present
 
588
        should result in the UI being prompted with a message explaining as
 
589
        much.  If the UI says yes, the Deferred should fire with True.
 
590
        """
 
591
        ui, l, knownHostsFile = self.verifyNonPresentKey()
 
592
        ui.promptDeferred.callback(True)
 
593
        self.assertEqual([True], l)
 
594
        reloaded = KnownHostsFile.fromPath(knownHostsFile._savePath)
 
595
        self.assertEqual(
 
596
            True,
 
597
            reloaded.hasHostKey("4.3.2.1", Key.fromString(thirdSampleKey)))
 
598
        self.assertEqual(
 
599
            True,
 
600
            reloaded.hasHostKey("sample-host.example.com",
 
601
                                Key.fromString(thirdSampleKey)))
 
602
 
 
603
 
 
604
    def test_verifyNonPresentKey_No(self):
 
605
        """
 
606
        Verifying a key where neither the hostname nor the IP are present
 
607
        should result in the UI being prompted with a message explaining as
 
608
        much.  If the UI says no, the Deferred should fail with
 
609
        UserRejectedKey.
 
610
        """
 
611
        ui, l, knownHostsFile = self.verifyNonPresentKey()
 
612
        ui.promptDeferred.callback(False)
 
613
        l[0].trap(UserRejectedKey)
 
614
 
 
615
 
 
616
    def test_verifyHostIPMismatch(self):
 
617
        """
 
618
        Verifying a key where the host is present (and correct), but the IP is
 
619
        present and different, should result the deferred firing in a
 
620
        HostKeyChanged failure.
 
621
        """
 
622
        hostsFile = self.loadSampleHostsFile()
 
623
        wrongKey = Key.fromString(thirdSampleKey)
 
624
        ui = FakeUI()
 
625
        d = hostsFile.verifyHostKey(
 
626
            ui, "www.twistedmatrix.com", "4.3.2.1", wrongKey)
 
627
        return self.assertFailure(d, HostKeyChanged)
 
628
 
 
629
 
 
630
    def test_verifyKeyForHostAndIP(self):
 
631
        """
 
632
        Verifying a key where the hostname is present but the IP is not should
 
633
        result in the key being added for the IP and the user being warned
 
634
        about the change.
 
635
        """
 
636
        ui = FakeUI()
 
637
        hostsFile = self.loadSampleHostsFile()
 
638
        expectedKey = Key.fromString(sampleKey)
 
639
        hostsFile.verifyHostKey(
 
640
            ui, "www.twistedmatrix.com", "5.4.3.2", expectedKey)
 
641
        self.assertEqual(
 
642
            True, KnownHostsFile.fromPath(hostsFile._savePath).hasHostKey(
 
643
                "5.4.3.2", expectedKey))
 
644
        self.assertEqual(
 
645
            ["Warning: Permanently added the RSA host key for IP address "
 
646
             "'5.4.3.2' to the list of known hosts."],
 
647
            ui.userWarnings)
 
648
 
 
649
 
 
650
class FakeFile(object):
 
651
    """
 
652
    A fake file-like object that acts enough like a file for
 
653
    L{ConsoleUI.prompt}.
 
654
    """
 
655
 
 
656
    def __init__(self):
 
657
        self.inlines = []
 
658
        self.outchunks = []
 
659
        self.closed = False
 
660
 
 
661
 
 
662
    def readline(self):
 
663
        """
 
664
        Return a line from the 'inlines' list.
 
665
        """
 
666
        return self.inlines.pop(0)
 
667
 
 
668
 
 
669
    def write(self, chunk):
 
670
        """
 
671
        Append the given item to the 'outchunks' list.
 
672
        """
 
673
        if self.closed:
 
674
            raise IOError("the file was closed")
 
675
        self.outchunks.append(chunk)
 
676
 
 
677
 
 
678
    def close(self):
 
679
        """
 
680
        Set the 'closed' flag to True, explicitly marking that it has been
 
681
        closed.
 
682
        """
 
683
        self.closed = True
 
684
 
 
685
 
 
686
 
 
687
class ConsoleUITests(TestCase):
 
688
    """
 
689
    Test cases for L{ConsoleUI}.
 
690
    """
 
691
 
 
692
    def setUp(self):
 
693
        """
 
694
        Create a L{ConsoleUI} pointed at a L{FakeFile}.
 
695
        """
 
696
        self.fakeFile = FakeFile()
 
697
        self.ui = ConsoleUI(self.openFile)
 
698
 
 
699
 
 
700
    def openFile(self):
 
701
        """
 
702
        Return the current fake file.
 
703
        """
 
704
        return self.fakeFile
 
705
 
 
706
 
 
707
    def newFile(self, lines):
 
708
        """
 
709
        Create a new fake file (the next file that self.ui will open) with the
 
710
        given list of lines to be returned from readline().
 
711
        """
 
712
        self.fakeFile = FakeFile()
 
713
        self.fakeFile.inlines = lines
 
714
 
 
715
 
 
716
    def test_promptYes(self):
 
717
        """
 
718
        L{ConsoleUI.prompt} writes a message to the console, then reads a line.
 
719
        If that line is 'yes', then it returns a L{Deferred} that fires with
 
720
        True.
 
721
        """
 
722
        for okYes in ['yes', 'Yes', 'yes\n']:
 
723
            self.newFile([okYes])
 
724
            l = []
 
725
            self.ui.prompt("Hello, world!").addCallback(l.append)
 
726
            self.assertEqual(["Hello, world!"], self.fakeFile.outchunks)
 
727
            self.assertEqual([True], l)
 
728
            self.assertEqual(True, self.fakeFile.closed)
 
729
 
 
730
 
 
731
    def test_promptNo(self):
 
732
        """
 
733
        L{ConsoleUI.prompt} writes a message to the console, then reads a line.
 
734
        If that line is 'no', then it returns a L{Deferred} that fires with
 
735
        False.
 
736
        """
 
737
        for okNo in ['no', 'No', 'no\n']:
 
738
            self.newFile([okNo])
 
739
            l = []
 
740
            self.ui.prompt("Goodbye, world!").addCallback(l.append)
 
741
            self.assertEqual(["Goodbye, world!"], self.fakeFile.outchunks)
 
742
            self.assertEqual([False], l)
 
743
            self.assertEqual(True, self.fakeFile.closed)
 
744
 
 
745
 
 
746
    def test_promptRepeatedly(self):
 
747
        """
 
748
        L{ConsoleUI.prompt} writes a message to the console, then reads a line.
 
749
        If that line is neither 'yes' nor 'no', then it says "Please enter
 
750
        'yes' or 'no'" until it gets a 'yes' or a 'no', at which point it
 
751
        returns a Deferred that answers either True or False.
 
752
        """
 
753
        self.newFile(['what', 'uh', 'okay', 'yes'])
 
754
        l = []
 
755
        self.ui.prompt("Please say something useful.").addCallback(l.append)
 
756
        self.assertEqual([True], l)
 
757
        self.assertEqual(self.fakeFile.outchunks,
 
758
                         ["Please say something useful."] +
 
759
                         ["Please type 'yes' or 'no': "] * 3)
 
760
        self.assertEqual(True, self.fakeFile.closed)
 
761
        self.newFile(['blah', 'stuff', 'feh', 'no'])
 
762
        l = []
 
763
        self.ui.prompt("Please say something negative.").addCallback(l.append)
 
764
        self.assertEqual([False], l)
 
765
        self.assertEqual(self.fakeFile.outchunks,
 
766
                         ["Please say something negative."] +
 
767
                         ["Please type 'yes' or 'no': "] * 3)
 
768
        self.assertEqual(True, self.fakeFile.closed)
 
769
 
 
770
 
 
771
    def test_promptOpenFailed(self):
 
772
        """
 
773
        If the C{opener} passed to L{ConsoleUI} raises an exception, that
 
774
        exception will fail the L{Deferred} returned from L{ConsoleUI.prompt}.
 
775
        """
 
776
        def raiseIt():
 
777
            raise IOError()
 
778
        ui = ConsoleUI(raiseIt)
 
779
        d = ui.prompt("This is a test.")
 
780
        return self.assertFailure(d, IOError)
 
781
 
 
782
 
 
783
    def test_warn(self):
 
784
        """
 
785
        L{ConsoleUI.warn} should output a message to the console object.
 
786
        """
 
787
        self.ui.warn("Test message.")
 
788
        self.assertEqual(["Test message."], self.fakeFile.outchunks)
 
789
        self.assertEqual(True, self.fakeFile.closed)
 
790
 
 
791
 
 
792
    def test_warnOpenFailed(self):
 
793
        """
 
794
        L{ConsoleUI.warn} should log a traceback if the output can't be opened.
 
795
        """
 
796
        def raiseIt():
 
797
            1 / 0
 
798
        ui = ConsoleUI(raiseIt)
 
799
        ui.warn("This message never makes it.")
 
800
        self.assertEqual(len(self.flushLoggedErrors(ZeroDivisionError)), 1)
 
801
 
 
802
 
 
803
 
 
804
class FakeUI(object):
 
805
    """
 
806
    A fake UI object, adhering to the interface expected by
 
807
    L{KnownHostsFile.verifyHostKey}
 
808
 
 
809
    @ivar userWarnings: inputs provided to 'warn'.
 
810
 
 
811
    @ivar promptDeferred: last result returned from 'prompt'.
 
812
 
 
813
    @ivar promptText: the last input provided to 'prompt'.
 
814
    """
 
815
 
 
816
    def __init__(self):
 
817
        self.userWarnings = []
 
818
        self.promptDeferred = None
 
819
        self.promptText = None
 
820
 
 
821
 
 
822
    def prompt(self, text):
 
823
        """
 
824
        Issue the user an interactive prompt, which they can accept or deny.
 
825
        """
 
826
        self.promptText = text
 
827
        self.promptDeferred = Deferred()
 
828
        return self.promptDeferred
 
829
 
 
830
 
 
831
    def warn(self, text):
 
832
        """
 
833
        Issue a non-interactive warning to the user.
 
834
        """
 
835
        self.userWarnings.append(text)
 
836
 
 
837
 
 
838
 
 
839
class FakeObject(object):
 
840
    """
 
841
    A fake object that can have some attributes.  Used to fake
 
842
    L{SSHClientTransport} and L{SSHClientFactory}.
 
843
    """
 
844
 
 
845
 
 
846
class DefaultAPITests(TestCase):
 
847
    """
 
848
    The API in L{twisted.conch.client.default.verifyHostKey} is the integration
 
849
    point between the code in the rest of conch and L{KnownHostsFile}.
 
850
    """
 
851
 
 
852
    def patchedOpen(self, fname, mode):
 
853
        """
 
854
        The patched version of 'open'; this returns a L{FakeFile} that the
 
855
        instantiated L{ConsoleUI} can use.
 
856
        """
 
857
        self.assertEqual(fname, "/dev/tty")
 
858
        self.assertEqual(mode, "r+b")
 
859
        return self.fakeFile
 
860
 
 
861
 
 
862
    def setUp(self):
 
863
        """
 
864
        Patch 'open' in verifyHostKey.
 
865
        """
 
866
        self.fakeFile = FakeFile()
 
867
        self.patch(default, "_open", self.patchedOpen)
 
868
        self.hostsOption = self.mktemp()
 
869
        knownHostsFile = KnownHostsFile(FilePath(self.hostsOption))
 
870
        knownHostsFile.addHostKey("exists.example.com", Key.fromString(sampleKey))
 
871
        knownHostsFile.addHostKey("4.3.2.1", Key.fromString(sampleKey))
 
872
        knownHostsFile.save()
 
873
        self.fakeTransport = FakeObject()
 
874
        self.fakeTransport.factory = FakeObject()
 
875
        self.options = self.fakeTransport.factory.options = {
 
876
            'host': "exists.example.com",
 
877
            'known-hosts': self.hostsOption
 
878
            }
 
879
 
 
880
 
 
881
    def test_verifyOKKey(self):
 
882
        """
 
883
        L{default.verifyHostKey} should return a L{Deferred} which fires with
 
884
        C{1} when passed a host, IP, and key which already match the
 
885
        known_hosts file it is supposed to check.
 
886
        """
 
887
        l = []
 
888
        default.verifyHostKey(self.fakeTransport, "4.3.2.1", sampleKey,
 
889
                              "I don't care.").addCallback(l.append)
 
890
        self.assertEqual([1], l)
 
891
 
 
892
 
 
893
    def replaceHome(self, tempHome):
 
894
        """
 
895
        Replace the HOME environment variable until the end of the current
 
896
        test, with the given new home-directory, so that L{os.path.expanduser}
 
897
        will yield controllable, predictable results.
 
898
 
 
899
        @param tempHome: the pathname to replace the HOME variable with.
 
900
 
 
901
        @type tempHome: L{str}
 
902
        """
 
903
        oldHome = os.environ.get('HOME')
 
904
        def cleanupHome():
 
905
            if oldHome is None:
 
906
                del os.environ['HOME']
 
907
            else:
 
908
                os.environ['HOME'] = oldHome
 
909
        self.addCleanup(cleanupHome)
 
910
        os.environ['HOME'] = tempHome
 
911
 
 
912
 
 
913
    def test_noKnownHostsOption(self):
 
914
        """
 
915
        L{default.verifyHostKey} should find your known_hosts file in
 
916
        ~/.ssh/known_hosts if you don't specify one explicitly on the command
 
917
        line.
 
918
        """
 
919
        l = []
 
920
        tmpdir = self.mktemp()
 
921
        oldHostsOption = self.hostsOption
 
922
        hostsNonOption = FilePath(tmpdir).child(".ssh").child("known_hosts")
 
923
        hostsNonOption.parent().makedirs()
 
924
        FilePath(oldHostsOption).moveTo(hostsNonOption)
 
925
        self.replaceHome(tmpdir)
 
926
        self.options['known-hosts'] = None
 
927
        default.verifyHostKey(self.fakeTransport, "4.3.2.1", sampleKey,
 
928
                              "I don't care.").addCallback(l.append)
 
929
        self.assertEqual([1], l)
 
930
 
 
931
 
 
932
    def test_verifyHostButNotIP(self):
 
933
        """
 
934
        L{default.verifyHostKey} should return a L{Deferred} which fires with
 
935
        C{1} when passed a host which matches with an IP is not present in its
 
936
        known_hosts file, and should also warn the user that it has added the
 
937
        IP address.
 
938
        """
 
939
        l = []
 
940
        default.verifyHostKey(self.fakeTransport, "8.7.6.5", sampleKey,
 
941
                              "Fingerprint not required.").addCallback(l.append)
 
942
        self.assertEqual(
 
943
            ["Warning: Permanently added the RSA host key for IP address "
 
944
            "'8.7.6.5' to the list of known hosts."],
 
945
            self.fakeFile.outchunks)
 
946
        self.assertEqual([1], l)
 
947
        knownHostsFile = KnownHostsFile.fromPath(FilePath(self.hostsOption))
 
948
        self.assertEqual(True, knownHostsFile.hasHostKey("8.7.6.5",
 
949
                                             Key.fromString(sampleKey)))
 
950
 
 
951
 
 
952
    def test_verifyQuestion(self):
 
953
        """
 
954
        L{default.verifyHostKey} should return a L{Default} which fires with
 
955
        C{0} when passed a unknown host that the user refuses to acknowledge.
 
956
        """
 
957
        self.fakeTransport.factory.options['host'] = 'fake.example.com'
 
958
        self.fakeFile.inlines.append("no")
 
959
        d = default.verifyHostKey(
 
960
            self.fakeTransport, "9.8.7.6", otherSampleKey, "No fingerprint!")
 
961
        self.assertEqual(
 
962
            ["The authenticity of host 'fake.example.com (9.8.7.6)' "
 
963
             "can't be established.\n"
 
964
             "RSA key fingerprint is "
 
965
             "57:a1:c2:a1:07:a0:2b:f4:ce:b5:e5:b7:ae:cc:e1:99.\n"
 
966
              "Are you sure you want to continue connecting (yes/no)? "],
 
967
             self.fakeFile.outchunks)
 
968
        return self.assertFailure(d, UserRejectedKey)
 
969
 
 
970
 
 
971
    def test_verifyBadKey(self):
 
972
        """
 
973
        L{default.verifyHostKey} should return a L{Deferred} which fails with
 
974
        L{HostKeyChanged} if the host key is incorrect.
 
975
        """
 
976
        d = default.verifyHostKey(
 
977
            self.fakeTransport, "4.3.2.1", otherSampleKey,
 
978
            "Again, not required.")
 
979
        return self.assertFailure(d, HostKeyChanged)