1
# -*- test-case-name: twisted.conch.test.test_knownhosts -*-
2
# Copyright (c) 2008-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
An implementation of the OpenSSH known_hosts database.
11
from binascii import Error as DecodeError, b2a_base64
13
from zope.interface import implements
15
from Crypto.Hash.HMAC import HMAC
16
from Crypto.Hash import SHA
18
from twisted.python.randbytes import secureRandom
20
from twisted.internet import defer
22
from twisted.python import log
23
from twisted.conch.interfaces import IKnownHostEntry
24
from twisted.conch.error import HostKeyChanged, UserRejectedKey, InvalidEntry
25
from twisted.conch.ssh.keys import Key, BadKeyError
30
Encode a binary string as base64 with no trailing newline.
32
return b2a_base64(s).strip()
36
def _extractCommon(string):
38
Extract common elements of base64 keys from an entry in a hosts file.
40
@return: a 4-tuple of hostname data (L{str}), ssh key type (L{str}), key
41
(L{Key}), and comment (L{str} or L{None}). The hostname data is simply the
42
beginning of the line up to the first occurrence of whitespace.
44
elements = string.split(None, 2)
45
if len(elements) != 3:
47
hostnames, keyType, keyAndComment = elements
48
splitkey = keyAndComment.split(None, 1)
49
if len(splitkey) == 2:
50
keyString, comment = splitkey
51
comment = comment.rstrip("\n")
53
keyString = splitkey[0]
55
key = Key.fromString(keyString.decode('base64'))
56
return hostnames, keyType, key, comment
60
class _BaseEntry(object):
62
Abstract base of both hashed and non-hashed entry objects, since they
63
represent keys and key types the same way.
65
@ivar keyType: The type of the key; either ssh-dss or ssh-rsa.
68
@ivar publicKey: The server public key indicated by this line.
69
@type publicKey: L{twisted.conch.ssh.keys.Key}
71
@ivar comment: Trailing garbage after the key line.
75
def __init__(self, keyType, publicKey, comment):
76
self.keyType = keyType
77
self.publicKey = publicKey
78
self.comment = comment
81
def matchesKey(self, keyObject):
83
Check to see if this entry matches a given key object.
85
@type keyObject: L{Key}
89
return self.publicKey == keyObject
93
class PlainEntry(_BaseEntry):
95
A L{PlainEntry} is a representation of a plain-text entry in a known_hosts
98
@ivar _hostnames: the list of all host-names associated with this entry.
99
@type _hostnames: L{list} of L{str}
102
implements(IKnownHostEntry)
104
def __init__(self, hostnames, keyType, publicKey, comment):
105
self._hostnames = hostnames
106
super(PlainEntry, self).__init__(keyType, publicKey, comment)
109
def fromString(cls, string):
111
Parse a plain-text entry in a known_hosts file, and return a
112
corresponding L{PlainEntry}.
114
@param string: a space-separated string formatted like "hostname
115
key-type base64-key-data comment".
119
@raise DecodeError: if the key is not valid encoded as valid base64.
121
@raise InvalidEntry: if the entry does not have the right number of
122
elements and is therefore invalid.
124
@raise BadKeyError: if the key, once decoded from base64, is not
127
@return: an IKnownHostEntry representing the hostname and key in the
130
@rtype: L{PlainEntry}
132
hostnames, keyType, key, comment = _extractCommon(string)
133
self = cls(hostnames.split(","), keyType, key, comment)
136
fromString = classmethod(fromString)
139
def matchesHost(self, hostname):
141
Check to see if this entry matches a given hostname.
143
@type hostname: L{str}
147
return hostname in self._hostnames
152
Implement L{IKnownHostEntry.toString} by recording the comma-separated
153
hostnames, key type, and base-64 encoded key.
155
fields = [','.join(self._hostnames),
157
_b64encode(self.publicKey.blob())]
158
if self.comment is not None:
159
fields.append(self.comment)
160
return ' '.join(fields)
163
class UnparsedEntry(object):
165
L{UnparsedEntry} is an entry in a L{KnownHostsFile} which can't actually be
166
parsed; therefore it matches no keys and no hosts.
169
implements(IKnownHostEntry)
171
def __init__(self, string):
173
Create an unparsed entry from a line in a known_hosts file which cannot
176
self._string = string
179
def matchesHost(self, hostname):
181
Always returns False.
186
def matchesKey(self, key):
188
Always returns False.
195
Returns the input line, without its newline if one was given.
197
return self._string.rstrip("\n")
201
def _hmacedString(key, string):
203
Return the SHA-1 HMAC hash of the given key and string.
205
hash = HMAC(key, digestmod=SHA)
211
class HashedEntry(_BaseEntry):
213
A L{HashedEntry} is a representation of an entry in a known_hosts file
214
where the hostname has been hashed and salted.
216
@ivar _hostSalt: the salt to combine with a hostname for hashing.
218
@ivar _hostHash: the hashed representation of the hostname.
220
@cvar MAGIC: the 'hash magic' string used to identify a hashed line in a
221
known_hosts file as opposed to a plaintext one.
224
implements(IKnownHostEntry)
228
def __init__(self, hostSalt, hostHash, keyType, publicKey, comment):
229
self._hostSalt = hostSalt
230
self._hostHash = hostHash
231
super(HashedEntry, self).__init__(keyType, publicKey, comment)
234
def fromString(cls, string):
236
Load a hashed entry from a string representing a line in a known_hosts
239
@raise DecodeError: if the key, the hostname, or the is not valid
240
encoded as valid base64
242
@raise InvalidEntry: if the entry does not have the right number of
243
elements and is therefore invalid, or the host/hash portion contains
244
more items than just the host and hash.
246
@raise BadKeyError: if the key, once decoded from base64, is not
249
stuff, keyType, key, comment = _extractCommon(string)
250
saltAndHash = stuff[len(cls.MAGIC):].split("|")
251
if len(saltAndHash) != 2:
253
hostSalt, hostHash = saltAndHash
254
self = cls(hostSalt.decode("base64"), hostHash.decode("base64"),
255
keyType, key, comment)
258
fromString = classmethod(fromString)
261
def matchesHost(self, hostname):
263
Implement L{IKnownHostEntry.matchesHost} to compare the hash of the
264
input to the stored hash.
266
return (_hmacedString(self._hostSalt, hostname) == self._hostHash)
271
Implement L{IKnownHostEntry.toString} by base64-encoding the salt, host
274
fields = [self.MAGIC + '|'.join([_b64encode(self._hostSalt),
275
_b64encode(self._hostHash)]),
277
_b64encode(self.publicKey.blob())]
278
if self.comment is not None:
279
fields.append(self.comment)
280
return ' '.join(fields)
284
class KnownHostsFile(object):
286
A structured representation of an OpenSSH-format ~/.ssh/known_hosts file.
288
@ivar _entries: a list of L{IKnownHostEntry} providers.
290
@ivar _savePath: the L{FilePath} to save new entries to.
293
def __init__(self, savePath):
295
Create a new, empty KnownHostsFile.
297
You want to use L{KnownHostsFile.fromPath} to parse one of these.
300
self._savePath = savePath
303
def hasHostKey(self, hostname, key):
305
@return: True if the given hostname and key are present in this file,
306
False if they are not.
310
@raise HostKeyChanged: if the host key found for the given hostname
311
does not match the given key.
313
for lineidx, entry in enumerate(self._entries):
314
if entry.matchesHost(hostname):
315
if entry.matchesKey(key):
318
raise HostKeyChanged(entry, self._savePath, lineidx + 1)
322
def verifyHostKey(self, ui, hostname, ip, key):
324
Verify the given host key for the given IP and host, asking for
325
confirmation from, and notifying, the given UI about changes to this
328
@param ui: The user interface to request an IP address from.
330
@param hostname: The hostname that the user requested to connect to.
332
@param ip: The string representation of the IP address that is actually
335
@param key: The public key of the server.
337
@return: a L{Deferred} that fires with True when the key has been
338
verified, or fires with an errback when the key either cannot be
339
verified or has changed.
343
hhk = defer.maybeDeferred(self.hasHostKey, hostname, key)
344
def gotHasKey(result):
346
if not self.hasHostKey(ip, key):
347
ui.warn("Warning: Permanently added the %s host key for "
348
"IP address '%s' to the list of known hosts." %
350
self.addHostKey(ip, key)
354
def promptResponse(response):
356
self.addHostKey(hostname, key)
357
self.addHostKey(ip, key)
361
raise UserRejectedKey()
363
"The authenticity of host '%s (%s)' "
364
"can't be established.\n"
365
"RSA key fingerprint is %s.\n"
366
"Are you sure you want to continue connecting (yes/no)? " %
367
(hostname, ip, key.fingerprint())).addCallback(promptResponse)
368
return hhk.addCallback(gotHasKey)
371
def addHostKey(self, hostname, key):
373
Add a new L{HashedEntry} to the key database.
375
Note that you still need to call L{KnownHostsFile.save} if you wish
376
these changes to be persisted.
378
@return: the L{HashedEntry} that was added.
380
salt = secureRandom(20)
381
keyType = "ssh-" + key.type().lower()
382
entry = HashedEntry(salt, _hmacedString(salt, hostname),
384
self._entries.append(entry)
390
Save this L{KnownHostsFile} to the path it was loaded from.
392
p = self._savePath.parent()
395
self._savePath.setContent('\n'.join(
396
[entry.toString() for entry in self._entries]) + "\n")
399
def fromPath(cls, path):
401
@param path: A path object to use for both reading contents from and
404
@type path: L{FilePath}
412
if line.startswith(HashedEntry.MAGIC):
413
entry = HashedEntry.fromString(line)
416
entry = PlainEntry.fromString(line)
417
except (DecodeError, InvalidEntry, BadKeyError):
418
entry = UnparsedEntry(line)
419
self._entries.append(entry)
422
fromPath = classmethod(fromPath)
425
class ConsoleUI(object):
427
A UI object that can ask true/false questions and post notifications on the
428
console, to be used during key verification.
430
@ivar opener: a no-argument callable which should open a console file-like
431
object to be used for reading and writing.
434
def __init__(self, opener):
438
def prompt(self, text):
440
Write the given text as a prompt to the console output, then read a
441
result from the console input.
443
@return: a L{Deferred} which fires with L{True} when the user answers
444
'yes' and L{False} when the user answers 'no'. It may errback if there
447
d = defer.succeed(None)
452
answer = f.readline().strip().lower()
460
f.write("Please type 'yes' or 'no': ")
461
return d.addCallback(body)
464
def warn(self, text):
466
Notify the user (non-interactively) of the provided text, by writing it