1
# Copyright (c) 2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.conch.client.knownhosts}.
9
from binascii import Error as BinasciiError, b2a_base64, a2b_base64
15
skip = "PyCrypto and PyASN1 required for twisted.conch.knownhosts."
17
from twisted.conch.ssh.keys import Key, BadKeyError
18
from twisted.conch.client.knownhosts import \
19
PlainEntry, HashedEntry, KnownHostsFile, UnparsedEntry, ConsoleUI
20
from twisted.conch.client import default
22
from zope.interface.verify import verifyObject
24
from twisted.python.filepath import FilePath
25
from twisted.trial.unittest import TestCase
26
from twisted.internet.defer import Deferred
27
from twisted.conch.interfaces import IKnownHostEntry
28
from twisted.conch.error import HostKeyChanged, UserRejectedKey, InvalidEntry
32
'AAAAB3NzaC1yc2EAAAABIwAAAQEAsV0VMRbGmzhqxxayLRHmvnFvtyNqgbNKV46dU1bVFB+3y'
33
'tNvue4Riqv/SVkPRNwMb7eWH29SviXaBxUhYyzKkDoNUq3rTNnH1Vnif6d6X4JCrUb5d3W+Dm'
34
'YClyJrZ5HgD/hUpdSkTRqdbQ2TrvSAxRacj+vHHT4F4dm1bJSewm3B2D8HVOoi/CbVh3dsIiC'
35
'dp8VltdZx4qYVfYe2LwVINCbAa3d3tj9ma7RVfw3OH2Mfb+toLd1N5tBQFb7oqTt2nC6I/6Bd'
36
'4JwPUld+IEitw/suElq/AIJVQXXujeyiZlea90HE65U2mF1ytr17HTAIT2ySokJWyuBANGACk'
39
otherSampleEncodedKey = (
40
'AAAAB3NzaC1yc2EAAAABIwAAAIEAwaeCZd3UCuPXhX39+/p9qO028jTF76DMVd9mPvYVDVXuf'
41
'WckKZauF7+0b7qm+ChT7kan6BzRVo4++gCVNfAlMzLysSt3ylmOR48tFpAfygg9UCX3DjHz0E'
42
'lOOUKh3iifc9aUShD0OPaK3pR5JJ8jfiBfzSYWt/hDi/iZ4igsSs8=')
44
thirdSampleEncodedKey = (
45
'AAAAB3NzaC1yc2EAAAABIwAAAQEAl/TQakPkePlnwCBRPitIVUTg6Z8VzN1en+DGkyo/evkmLw'
46
'7o4NWR5qbysk9A9jXW332nxnEuAnbcCam9SHe1su1liVfyIK0+3bdn0YRB0sXIbNEtMs2LtCho'
47
'/aV3cXPS+Cf1yut3wvIpaRnAzXxuKPCTXQ7/y0IXa8TwkRBH58OJa3RqfQ/NsSp5SAfdsrHyH2'
48
'aitiVKm2jfbTKzSEqOQG/zq4J9GXTkq61gZugory/Tvl5/yPgSnOR6C9jVOMHf27ZPoRtyj9SY'
49
'343Hd2QHiIE0KPZJEgCynKeWoKz8v6eTSK8n4rBnaqWdp8MnGZK1WGy05MguXbyCDuTC8AmJXQ'
52
sampleKey = a2b_base64(sampleEncodedKey)
53
otherSampleKey = a2b_base64(otherSampleEncodedKey)
54
thirdSampleKey = a2b_base64(thirdSampleEncodedKey)
56
samplePlaintextLine = (
57
"www.twistedmatrix.com ssh-rsa " + sampleEncodedKey + "\n")
59
otherSamplePlaintextLine = (
60
"divmod.com ssh-rsa " + otherSampleEncodedKey + "\n")
63
"www.twistedmatrix.com,198.49.126.131 ssh-rsa " + sampleEncodedKey + "\n")
66
"|1|gJbSEPBG9ZSBoZpHNtZBD1bHKBA=|bQv+0Xa0dByrwkA1EB0E7Xop/Fo= ssh-rsa " +
67
sampleEncodedKey + "\n")
71
class EntryTestsMixin:
73
Tests for implementations of L{IKnownHostEntry}. Subclasses must set the
74
'entry' attribute to a provider of that interface, the implementation of
75
that interface under test.
77
@ivar entry: a provider of L{IKnownHostEntry} with a hostname of
78
www.twistedmatrix.com and an RSA key of sampleKey.
81
def test_providesInterface(self):
83
The given entry should provide IKnownHostEntry.
85
verifyObject(IKnownHostEntry, self.entry)
88
def test_fromString(self):
90
Constructing a plain text entry from an unhashed known_hosts entry will
91
result in an L{IKnownHostEntry} provider with 'keyString', 'hostname',
92
and 'keyType' attributes. While outside the interface in question,
93
these attributes are held in common by L{PlainEntry} and L{HashedEntry}
94
implementations; other implementations should override this method in
98
self.assertEqual(entry.publicKey, Key.fromString(sampleKey))
99
self.assertEqual(entry.keyType, "ssh-rsa")
102
def test_matchesKey(self):
104
L{IKnownHostEntry.matchesKey} checks to see if an entry matches a given
107
twistedmatrixDotCom = Key.fromString(sampleKey)
108
divmodDotCom = Key.fromString(otherSampleKey)
111
self.entry.matchesKey(twistedmatrixDotCom))
114
self.entry.matchesKey(divmodDotCom))
117
def test_matchesHost(self):
119
L{IKnownHostEntry.matchesHost} checks to see if an entry matches a
122
self.assertEqual(True, self.entry.matchesHost(
123
"www.twistedmatrix.com"))
124
self.assertEqual(False, self.entry.matchesHost(
129
class PlainEntryTests(EntryTestsMixin, TestCase):
131
Test cases for L{PlainEntry}.
133
plaintextLine = samplePlaintextLine
134
hostIPLine = sampleHostIPLine
138
Set 'entry' to a sample plain-text entry with sampleKey as its key.
140
self.entry = PlainEntry.fromString(self.plaintextLine)
143
def test_matchesHostIP(self):
145
A "hostname,ip" formatted line will match both the host and the IP.
147
self.entry = PlainEntry.fromString(self.hostIPLine)
148
self.assertEqual(True, self.entry.matchesHost("198.49.126.131"))
149
self.test_matchesHost()
152
def test_toString(self):
154
L{PlainEntry.toString} generates the serialized OpenSSL format string
155
for the entry, sans newline.
157
self.assertEqual(self.entry.toString(), self.plaintextLine.rstrip("\n"))
158
multiHostEntry = PlainEntry.fromString(self.hostIPLine)
159
self.assertEqual(multiHostEntry.toString(), self.hostIPLine.rstrip("\n"))
163
class PlainTextWithCommentTests(PlainEntryTests):
165
Test cases for L{PlainEntry} when parsed from a line with a comment.
168
plaintextLine = samplePlaintextLine[:-1] + " plain text comment.\n"
169
hostIPLine = sampleHostIPLine[:-1] + " text following host/IP line\n"
173
class HashedEntryTests(EntryTestsMixin, TestCase):
175
Tests for L{HashedEntry}.
177
This suite doesn't include any tests for host/IP pairs because hashed
178
entries store IP addresses the same way as hostnames and does not support
179
comma-separated lists. (If you hash the IP and host together you can't
180
tell if you've got the key already for one or the other.)
182
hashedLine = sampleHashedLine
186
Set 'entry' to a sample hashed entry for twistedmatrix.com with
187
sampleKey as its key.
189
self.entry = HashedEntry.fromString(self.hashedLine)
192
def test_toString(self):
194
L{HashedEntry.toString} generates the serialized OpenSSL format string
195
for the entry, sans the newline.
197
self.assertEqual(self.entry.toString(), self.hashedLine.rstrip("\n"))
201
class HashedEntryWithCommentTests(HashedEntryTests):
203
Test cases for L{PlainEntry} when parsed from a line with a comment.
206
hashedLine = sampleHashedLine[:-1] + " plain text comment.\n"
210
class UnparsedEntryTests(TestCase, EntryTestsMixin):
212
Tests for L{UnparsedEntry}
216
Set up the 'entry' to be an unparsed entry for some random text.
218
self.entry = UnparsedEntry(" This is a bogus entry. \n")
221
def test_fromString(self):
223
Creating an L{UnparsedEntry} should simply record the string it was
226
self.assertEqual(" This is a bogus entry. \n",
230
def test_matchesHost(self):
232
An unparsed entry can't match any hosts.
234
self.assertEqual(False, self.entry.matchesHost("www.twistedmatrix.com"))
237
def test_matchesKey(self):
239
An unparsed entry can't match any keys.
241
self.assertEqual(False, self.entry.matchesKey(Key.fromString(sampleKey)))
244
def test_toString(self):
246
L{UnparsedEntry.toString} returns its input string, sans trailing newline.
248
self.assertEqual(" This is a bogus entry. ", self.entry.toString())
252
class ParseErrorTests(TestCase):
254
L{HashedEntry.fromString} and L{PlainEntry.fromString} can raise a variety
255
of errors depending on misformattings of certain strings. These tests make
256
sure those errors are caught. Since many of the ways that this can go
257
wrong are in the lower-level APIs being invoked by the parsing logic,
258
several of these are integration tests with the L{base64} and
259
L{twisted.conch.ssh.keys} modules.
262
def invalidEntryTest(self, cls):
264
If there are fewer than three elements, C{fromString} should raise
267
self.assertRaises(InvalidEntry, cls.fromString, "invalid")
270
def notBase64Test(self, cls):
272
If the key is not base64, C{fromString} should raise L{BinasciiError}.
274
self.assertRaises(BinasciiError, cls.fromString, "x x x")
277
def badKeyTest(self, cls, prefix):
279
If the key portion of the entry is valid base64, but is not actually an
280
SSH key, C{fromString} should raise L{BadKeyError}.
282
self.assertRaises(BadKeyError, cls.fromString, ' '.join(
283
[prefix, "ssh-rsa", b2a_base64(
284
"Hey, this isn't an SSH key!").strip()]))
287
def test_invalidPlainEntry(self):
289
If there are fewer than three whitespace-separated elements in an
290
entry, L{PlainEntry.fromString} should raise L{InvalidEntry}.
292
self.invalidEntryTest(PlainEntry)
295
def test_invalidHashedEntry(self):
297
If there are fewer than three whitespace-separated elements in an
298
entry, or the hostname salt/hash portion has more than two elements,
299
L{HashedEntry.fromString} should raise L{InvalidEntry}.
301
self.invalidEntryTest(HashedEntry)
302
a, b, c = sampleHashedLine.split()
303
self.assertRaises(InvalidEntry, HashedEntry.fromString, ' '.join(
307
def test_plainNotBase64(self):
309
If the key portion of a plain entry is not decodable as base64,
310
C{fromString} should raise L{BinasciiError}.
312
self.notBase64Test(PlainEntry)
315
def test_hashedNotBase64(self):
317
If the key, host salt, or host hash portion of a hashed entry is not
318
encoded, it will raise L{BinasciiError}.
320
self.notBase64Test(HashedEntry)
321
a, b, c = sampleHashedLine.split()
322
# Salt not valid base64.
324
BinasciiError, HashedEntry.fromString,
325
' '.join(["|1|x|" + b2a_base64("stuff").strip(), b, c]))
326
# Host hash not valid base64.
328
BinasciiError, HashedEntry.fromString,
329
' '.join([HashedEntry.MAGIC + b2a_base64("stuff").strip() + "|x", b, c]))
330
# Neither salt nor hash valid base64.
332
BinasciiError, HashedEntry.fromString,
333
' '.join(["|1|x|x", b, c]))
336
def test_hashedBadKey(self):
338
If the key portion of the entry is valid base64, but is not actually an
339
SSH key, C{HashedEntry.fromString} should raise L{BadKeyError}.
341
a, b, c = sampleHashedLine.split()
342
self.badKeyTest(HashedEntry, a)
345
def test_plainBadKey(self):
347
If the key portion of the entry is valid base64, but is not actually an
348
SSH key, C{PlainEntry.fromString} should raise L{BadKeyError}.
350
self.badKeyTest(PlainEntry, "hostname")
354
class KnownHostsDatabaseTests(TestCase):
356
Tests for L{KnownHostsFile}.
359
def pathWithContent(self, content):
361
Return a FilePath with the given initial content.
363
fp = FilePath(self.mktemp())
364
fp.setContent(content)
368
def loadSampleHostsFile(self, content=(
369
sampleHashedLine + otherSamplePlaintextLine +
370
"\n# That was a blank line.\n"
371
"This is just unparseable.\n"
372
"This also unparseable.\n")):
374
Return a sample hosts file, with keys for www.twistedmatrix.com and
377
return KnownHostsFile.fromPath(self.pathWithContent(content))
380
def test_loadFromPath(self):
382
Loading a L{KnownHostsFile} from a path with six entries in it will
383
result in a L{KnownHostsFile} object with six L{IKnownHostEntry}
384
providers in it, each of the appropriate type.
386
hostsFile = self.loadSampleHostsFile()
387
self.assertEqual(len(hostsFile._entries), 6)
388
self.assertIsInstance(hostsFile._entries[0], HashedEntry)
389
self.assertEqual(True, hostsFile._entries[0].matchesHost(
390
"www.twistedmatrix.com"))
391
self.assertIsInstance(hostsFile._entries[1], PlainEntry)
392
self.assertEqual(True, hostsFile._entries[1].matchesHost(
394
self.assertIsInstance(hostsFile._entries[2], UnparsedEntry)
395
self.assertEqual(hostsFile._entries[2].toString(), "")
396
self.assertIsInstance(hostsFile._entries[3], UnparsedEntry)
397
self.assertEqual(hostsFile._entries[3].toString(),
398
"# That was a blank line.")
399
self.assertIsInstance(hostsFile._entries[4], UnparsedEntry)
400
self.assertEqual(hostsFile._entries[4].toString(),
401
"This is just unparseable.")
402
self.assertIsInstance(hostsFile._entries[5], UnparsedEntry)
403
self.assertEqual(hostsFile._entries[5].toString(),
404
"This also unparseable.")
407
def test_loadNonExistent(self):
409
Loading a L{KnownHostsFile} from a path that does not exist should
410
result in an empty L{KnownHostsFile} that will save back to that path.
413
knownHostsFile = KnownHostsFile.fromPath(FilePath(pn))
414
self.assertEqual([], list(knownHostsFile._entries))
415
self.assertEqual(False, FilePath(pn).exists())
416
knownHostsFile.save()
417
self.assertEqual(True, FilePath(pn).exists())
420
def test_loadNonExistentParent(self):
422
Loading a L{KnownHostsFile} from a path whose parent directory does not
423
exist should result in an empty L{KnownHostsFile} that will save back
424
to that path, creating its parent directory(ies) in the process.
426
thePath = FilePath(self.mktemp())
427
knownHostsPath = thePath.child("foo").child("known_hosts")
428
knownHostsFile = KnownHostsFile.fromPath(knownHostsPath)
429
knownHostsFile.save()
430
knownHostsPath.restat(False)
431
self.assertEqual(True, knownHostsPath.exists())
434
def test_savingAddsEntry(self):
436
L{KnownHostsFile.save()} will write out a new file with any entries
437
that have been added.
439
path = self.pathWithContent(sampleHashedLine +
440
otherSamplePlaintextLine)
441
knownHostsFile = KnownHostsFile.fromPath(path)
442
newEntry = knownHostsFile.addHostKey("some.example.com", Key.fromString(thirdSampleKey))
445
otherSamplePlaintextLine + HashedEntry.MAGIC +
446
b2a_base64(newEntry._hostSalt).strip() + "|" +
447
b2a_base64(newEntry._hostHash).strip() + " ssh-rsa " +
448
thirdSampleEncodedKey + "\n")
450
# Sanity check, let's make sure the base64 API being used for the test
451
# isn't inserting spurious newlines.
452
self.assertEqual(3, expectedContent.count("\n"))
453
knownHostsFile.save()
454
self.assertEqual(expectedContent, path.getContent())
457
def test_hasPresentKey(self):
459
L{KnownHostsFile.hasHostKey} returns C{True} when a key for the given
460
hostname is present and matches the expected key.
462
hostsFile = self.loadSampleHostsFile()
463
self.assertEqual(True, hostsFile.hasHostKey(
464
"www.twistedmatrix.com", Key.fromString(sampleKey)))
467
def test_hasNonPresentKey(self):
469
L{KnownHostsFile.hasHostKey} returns C{False} when a key for the given
470
hostname is not present.
472
hostsFile = self.loadSampleHostsFile()
473
self.assertEqual(False, hostsFile.hasHostKey(
474
"non-existent.example.com", Key.fromString(sampleKey)))
477
def test_hasKeyMismatch(self):
479
L{KnownHostsFile.hasHostKey} raises L{HostKeyChanged} if the host key
480
is present, but different from the expected one. The resulting
481
exception should have an offendingEntry indicating the given entry.
483
hostsFile = self.loadSampleHostsFile()
484
exception = self.assertRaises(
485
HostKeyChanged, hostsFile.hasHostKey,
486
"www.twistedmatrix.com", Key.fromString(otherSampleKey))
487
self.assertEqual(exception.offendingEntry, hostsFile._entries[0])
488
self.assertEqual(exception.lineno, 1)
489
self.assertEqual(exception.path, hostsFile._savePath)
492
def test_addHostKey(self):
494
L{KnownHostsFile.addHostKey} adds a new L{HashedEntry} to the host
495
file, and returns it.
497
hostsFile = self.loadSampleHostsFile()
498
aKey = Key.fromString(thirdSampleKey)
499
self.assertEqual(False,
500
hostsFile.hasHostKey("somewhere.example.com", aKey))
501
newEntry = hostsFile.addHostKey("somewhere.example.com", aKey)
503
# The code in OpenSSH requires host salts to be 20 characters long.
504
# This is the required length of a SHA-1 HMAC hash, so it's just a
506
self.assertEqual(20, len(newEntry._hostSalt))
507
self.assertEqual(True,
508
newEntry.matchesHost("somewhere.example.com"))
509
self.assertEqual(newEntry.keyType, "ssh-rsa")
510
self.assertEqual(aKey, newEntry.publicKey)
511
self.assertEqual(True,
512
hostsFile.hasHostKey("somewhere.example.com", aKey))
515
def test_randomSalts(self):
517
L{KnownHostsFile.addHostKey} generates a random salt for each new key,
518
so subsequent salts will be different.
520
hostsFile = self.loadSampleHostsFile()
521
aKey = Key.fromString(thirdSampleKey)
523
hostsFile.addHostKey("somewhere.example.com", aKey)._hostSalt,
524
hostsFile.addHostKey("somewhere-else.example.com", aKey)._hostSalt)
527
def test_verifyValidKey(self):
529
Verifying a valid key should return a L{Deferred} which fires with
532
hostsFile = self.loadSampleHostsFile()
533
hostsFile.addHostKey("1.2.3.4", Key.fromString(sampleKey))
535
d = hostsFile.verifyHostKey(ui, "www.twistedmatrix.com", "1.2.3.4",
536
Key.fromString(sampleKey))
538
d.addCallback(l.append)
539
self.assertEqual(l, [True])
542
def test_verifyInvalidKey(self):
544
Verfying an invalid key should return a L{Deferred} which fires with a
545
L{HostKeyChanged} failure.
547
hostsFile = self.loadSampleHostsFile()
548
wrongKey = Key.fromString(thirdSampleKey)
550
hostsFile.addHostKey("1.2.3.4", Key.fromString(sampleKey))
551
d = hostsFile.verifyHostKey(
552
ui, "www.twistedmatrix.com", "1.2.3.4", wrongKey)
553
return self.assertFailure(d, HostKeyChanged)
556
def verifyNonPresentKey(self):
558
Set up a test to verify a key that isn't present. Return a 3-tuple of
559
the UI, a list set up to collect the result of the verifyHostKey call,
560
and the sample L{KnownHostsFile} being used.
562
This utility method avoids returning a L{Deferred}, and records results
563
in the returned list instead, because the events which get generated
564
here are pre-recorded in the 'ui' object. If the L{Deferred} in
565
question does not fire, the it will fail quickly with an empty list.
567
hostsFile = self.loadSampleHostsFile()
568
absentKey = Key.fromString(thirdSampleKey)
571
d = hostsFile.verifyHostKey(
572
ui, "sample-host.example.com", "4.3.2.1", absentKey)
574
self.assertEqual([], l)
577
"The authenticity of host 'sample-host.example.com (4.3.2.1)' "
578
"can't be established.\n"
579
"RSA key fingerprint is "
580
"89:4e:cc:8c:57:83:96:48:ef:63:ad:ee:99:00:4c:8f.\n"
581
"Are you sure you want to continue connecting (yes/no)? ")
582
return ui, l, hostsFile
585
def test_verifyNonPresentKey_Yes(self):
587
Verifying a key where neither the hostname nor the IP are present
588
should result in the UI being prompted with a message explaining as
589
much. If the UI says yes, the Deferred should fire with True.
591
ui, l, knownHostsFile = self.verifyNonPresentKey()
592
ui.promptDeferred.callback(True)
593
self.assertEqual([True], l)
594
reloaded = KnownHostsFile.fromPath(knownHostsFile._savePath)
597
reloaded.hasHostKey("4.3.2.1", Key.fromString(thirdSampleKey)))
600
reloaded.hasHostKey("sample-host.example.com",
601
Key.fromString(thirdSampleKey)))
604
def test_verifyNonPresentKey_No(self):
606
Verifying a key where neither the hostname nor the IP are present
607
should result in the UI being prompted with a message explaining as
608
much. If the UI says no, the Deferred should fail with
611
ui, l, knownHostsFile = self.verifyNonPresentKey()
612
ui.promptDeferred.callback(False)
613
l[0].trap(UserRejectedKey)
616
def test_verifyHostIPMismatch(self):
618
Verifying a key where the host is present (and correct), but the IP is
619
present and different, should result the deferred firing in a
620
HostKeyChanged failure.
622
hostsFile = self.loadSampleHostsFile()
623
wrongKey = Key.fromString(thirdSampleKey)
625
d = hostsFile.verifyHostKey(
626
ui, "www.twistedmatrix.com", "4.3.2.1", wrongKey)
627
return self.assertFailure(d, HostKeyChanged)
630
def test_verifyKeyForHostAndIP(self):
632
Verifying a key where the hostname is present but the IP is not should
633
result in the key being added for the IP and the user being warned
637
hostsFile = self.loadSampleHostsFile()
638
expectedKey = Key.fromString(sampleKey)
639
hostsFile.verifyHostKey(
640
ui, "www.twistedmatrix.com", "5.4.3.2", expectedKey)
642
True, KnownHostsFile.fromPath(hostsFile._savePath).hasHostKey(
643
"5.4.3.2", expectedKey))
645
["Warning: Permanently added the RSA host key for IP address "
646
"'5.4.3.2' to the list of known hosts."],
650
class FakeFile(object):
652
A fake file-like object that acts enough like a file for
664
Return a line from the 'inlines' list.
666
return self.inlines.pop(0)
669
def write(self, chunk):
671
Append the given item to the 'outchunks' list.
674
raise IOError("the file was closed")
675
self.outchunks.append(chunk)
680
Set the 'closed' flag to True, explicitly marking that it has been
687
class ConsoleUITests(TestCase):
689
Test cases for L{ConsoleUI}.
694
Create a L{ConsoleUI} pointed at a L{FakeFile}.
696
self.fakeFile = FakeFile()
697
self.ui = ConsoleUI(self.openFile)
702
Return the current fake file.
707
def newFile(self, lines):
709
Create a new fake file (the next file that self.ui will open) with the
710
given list of lines to be returned from readline().
712
self.fakeFile = FakeFile()
713
self.fakeFile.inlines = lines
716
def test_promptYes(self):
718
L{ConsoleUI.prompt} writes a message to the console, then reads a line.
719
If that line is 'yes', then it returns a L{Deferred} that fires with
722
for okYes in ['yes', 'Yes', 'yes\n']:
723
self.newFile([okYes])
725
self.ui.prompt("Hello, world!").addCallback(l.append)
726
self.assertEqual(["Hello, world!"], self.fakeFile.outchunks)
727
self.assertEqual([True], l)
728
self.assertEqual(True, self.fakeFile.closed)
731
def test_promptNo(self):
733
L{ConsoleUI.prompt} writes a message to the console, then reads a line.
734
If that line is 'no', then it returns a L{Deferred} that fires with
737
for okNo in ['no', 'No', 'no\n']:
740
self.ui.prompt("Goodbye, world!").addCallback(l.append)
741
self.assertEqual(["Goodbye, world!"], self.fakeFile.outchunks)
742
self.assertEqual([False], l)
743
self.assertEqual(True, self.fakeFile.closed)
746
def test_promptRepeatedly(self):
748
L{ConsoleUI.prompt} writes a message to the console, then reads a line.
749
If that line is neither 'yes' nor 'no', then it says "Please enter
750
'yes' or 'no'" until it gets a 'yes' or a 'no', at which point it
751
returns a Deferred that answers either True or False.
753
self.newFile(['what', 'uh', 'okay', 'yes'])
755
self.ui.prompt("Please say something useful.").addCallback(l.append)
756
self.assertEqual([True], l)
757
self.assertEqual(self.fakeFile.outchunks,
758
["Please say something useful."] +
759
["Please type 'yes' or 'no': "] * 3)
760
self.assertEqual(True, self.fakeFile.closed)
761
self.newFile(['blah', 'stuff', 'feh', 'no'])
763
self.ui.prompt("Please say something negative.").addCallback(l.append)
764
self.assertEqual([False], l)
765
self.assertEqual(self.fakeFile.outchunks,
766
["Please say something negative."] +
767
["Please type 'yes' or 'no': "] * 3)
768
self.assertEqual(True, self.fakeFile.closed)
771
def test_promptOpenFailed(self):
773
If the C{opener} passed to L{ConsoleUI} raises an exception, that
774
exception will fail the L{Deferred} returned from L{ConsoleUI.prompt}.
778
ui = ConsoleUI(raiseIt)
779
d = ui.prompt("This is a test.")
780
return self.assertFailure(d, IOError)
785
L{ConsoleUI.warn} should output a message to the console object.
787
self.ui.warn("Test message.")
788
self.assertEqual(["Test message."], self.fakeFile.outchunks)
789
self.assertEqual(True, self.fakeFile.closed)
792
def test_warnOpenFailed(self):
794
L{ConsoleUI.warn} should log a traceback if the output can't be opened.
798
ui = ConsoleUI(raiseIt)
799
ui.warn("This message never makes it.")
800
self.assertEqual(len(self.flushLoggedErrors(ZeroDivisionError)), 1)
804
class FakeUI(object):
806
A fake UI object, adhering to the interface expected by
807
L{KnownHostsFile.verifyHostKey}
809
@ivar userWarnings: inputs provided to 'warn'.
811
@ivar promptDeferred: last result returned from 'prompt'.
813
@ivar promptText: the last input provided to 'prompt'.
817
self.userWarnings = []
818
self.promptDeferred = None
819
self.promptText = None
822
def prompt(self, text):
824
Issue the user an interactive prompt, which they can accept or deny.
826
self.promptText = text
827
self.promptDeferred = Deferred()
828
return self.promptDeferred
831
def warn(self, text):
833
Issue a non-interactive warning to the user.
835
self.userWarnings.append(text)
839
class FakeObject(object):
841
A fake object that can have some attributes. Used to fake
842
L{SSHClientTransport} and L{SSHClientFactory}.
846
class DefaultAPITests(TestCase):
848
The API in L{twisted.conch.client.default.verifyHostKey} is the integration
849
point between the code in the rest of conch and L{KnownHostsFile}.
852
def patchedOpen(self, fname, mode):
854
The patched version of 'open'; this returns a L{FakeFile} that the
855
instantiated L{ConsoleUI} can use.
857
self.assertEqual(fname, "/dev/tty")
858
self.assertEqual(mode, "r+b")
864
Patch 'open' in verifyHostKey.
866
self.fakeFile = FakeFile()
867
self.patch(default, "_open", self.patchedOpen)
868
self.hostsOption = self.mktemp()
869
knownHostsFile = KnownHostsFile(FilePath(self.hostsOption))
870
knownHostsFile.addHostKey("exists.example.com", Key.fromString(sampleKey))
871
knownHostsFile.addHostKey("4.3.2.1", Key.fromString(sampleKey))
872
knownHostsFile.save()
873
self.fakeTransport = FakeObject()
874
self.fakeTransport.factory = FakeObject()
875
self.options = self.fakeTransport.factory.options = {
876
'host': "exists.example.com",
877
'known-hosts': self.hostsOption
881
def test_verifyOKKey(self):
883
L{default.verifyHostKey} should return a L{Deferred} which fires with
884
C{1} when passed a host, IP, and key which already match the
885
known_hosts file it is supposed to check.
888
default.verifyHostKey(self.fakeTransport, "4.3.2.1", sampleKey,
889
"I don't care.").addCallback(l.append)
890
self.assertEqual([1], l)
893
def replaceHome(self, tempHome):
895
Replace the HOME environment variable until the end of the current
896
test, with the given new home-directory, so that L{os.path.expanduser}
897
will yield controllable, predictable results.
899
@param tempHome: the pathname to replace the HOME variable with.
901
@type tempHome: L{str}
903
oldHome = os.environ.get('HOME')
906
del os.environ['HOME']
908
os.environ['HOME'] = oldHome
909
self.addCleanup(cleanupHome)
910
os.environ['HOME'] = tempHome
913
def test_noKnownHostsOption(self):
915
L{default.verifyHostKey} should find your known_hosts file in
916
~/.ssh/known_hosts if you don't specify one explicitly on the command
920
tmpdir = self.mktemp()
921
oldHostsOption = self.hostsOption
922
hostsNonOption = FilePath(tmpdir).child(".ssh").child("known_hosts")
923
hostsNonOption.parent().makedirs()
924
FilePath(oldHostsOption).moveTo(hostsNonOption)
925
self.replaceHome(tmpdir)
926
self.options['known-hosts'] = None
927
default.verifyHostKey(self.fakeTransport, "4.3.2.1", sampleKey,
928
"I don't care.").addCallback(l.append)
929
self.assertEqual([1], l)
932
def test_verifyHostButNotIP(self):
934
L{default.verifyHostKey} should return a L{Deferred} which fires with
935
C{1} when passed a host which matches with an IP is not present in its
936
known_hosts file, and should also warn the user that it has added the
940
default.verifyHostKey(self.fakeTransport, "8.7.6.5", sampleKey,
941
"Fingerprint not required.").addCallback(l.append)
943
["Warning: Permanently added the RSA host key for IP address "
944
"'8.7.6.5' to the list of known hosts."],
945
self.fakeFile.outchunks)
946
self.assertEqual([1], l)
947
knownHostsFile = KnownHostsFile.fromPath(FilePath(self.hostsOption))
948
self.assertEqual(True, knownHostsFile.hasHostKey("8.7.6.5",
949
Key.fromString(sampleKey)))
952
def test_verifyQuestion(self):
954
L{default.verifyHostKey} should return a L{Default} which fires with
955
C{0} when passed a unknown host that the user refuses to acknowledge.
957
self.fakeTransport.factory.options['host'] = 'fake.example.com'
958
self.fakeFile.inlines.append("no")
959
d = default.verifyHostKey(
960
self.fakeTransport, "9.8.7.6", otherSampleKey, "No fingerprint!")
962
["The authenticity of host 'fake.example.com (9.8.7.6)' "
963
"can't be established.\n"
964
"RSA key fingerprint is "
965
"57:a1:c2:a1:07:a0:2b:f4:ce:b5:e5:b7:ae:cc:e1:99.\n"
966
"Are you sure you want to continue connecting (yes/no)? "],
967
self.fakeFile.outchunks)
968
return self.assertFailure(d, UserRejectedKey)
971
def test_verifyBadKey(self):
973
L{default.verifyHostKey} should return a L{Deferred} which fails with
974
L{HostKeyChanged} if the host key is incorrect.
976
d = default.verifyHostKey(
977
self.fakeTransport, "4.3.2.1", otherSampleKey,
978
"Again, not required.")
979
return self.assertFailure(d, HostKeyChanged)