1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
from twisted.conch.error import ConchError
6
from twisted.conch.ssh import common, keys, userauth, agent
7
from twisted.internet import defer, protocol, reactor
8
from twisted.python import log
12
import os, sys, base64, getpass
14
def verifyHostKey(transport, host, pubKey, fingerprint):
15
goodKey = isInKnownHosts(host, pubKey, transport.factory.options)
16
if goodKey == 1: # good key
17
return defer.succeed(1)
18
elif goodKey == 2: # AAHHHHH changed
19
return defer.fail(ConchError('changed host key'))
21
oldout, oldin = sys.stdout, sys.stdin
22
sys.stdin = sys.stdout = open('/dev/tty','r+')
23
if host == transport.transport.getPeer().host:
26
host = '%s (%s)' % (host,
27
transport.transport.getPeer().host)
28
khHost = '%s,%s' % (host,
29
transport.transport.getPeer().host)
30
keyType = common.getNS(pubKey)[0]
31
print """The authenticity of host '%s' can't be established.
32
%s key fingerprint is %s.""" % (host,
33
{'ssh-dss':'DSA', 'ssh-rsa':'RSA'}[keyType],
36
ans = raw_input('Are you sure you want to continue connecting (yes/no)? ')
37
except KeyboardInterrupt:
38
return defer.fail(ConchError("^C"))
39
while ans.lower() not in ('yes', 'no'):
40
ans = raw_input("Please type 'yes' or 'no': ")
41
sys.stdout,sys.stdin=oldout,oldin
43
print 'Host key verification failed.'
44
return defer.fail(ConchError('bad host key'))
45
print "Warning: Permanently added '%s' (%s) to the list of known hosts." % (khHost, {'ssh-dss':'DSA', 'ssh-rsa':'RSA'}[keyType])
46
known_hosts = open(os.path.expanduser('~/.ssh/known_hosts'), 'r+')
47
known_hosts.seek(-1, 2)
48
if known_hosts.read(1) != '\n':
49
known_hosts.write('\n')
50
encodedKey = base64.encodestring(pubKey).replace('\n', '')
51
known_hosts.write('%s %s %s\n' % (khHost, keyType, encodedKey))
53
return defer.succeed(1)
55
def isInKnownHosts(host, pubKey, options):
56
"""checks to see if host is in the known_hosts file for the user.
57
returns 0 if it isn't, 1 if it is and is the same, 2 if it's changed.
59
keyType = common.getNS(pubKey)[0]
62
if not options['known-hosts'] and not os.path.exists(os.path.expanduser('~/.ssh/')):
63
print 'Creating ~/.ssh directory...'
64
os.mkdir(os.path.expanduser('~/.ssh'))
65
kh_file = options['known-hosts'] or '~/.ssh/known_hosts'
67
known_hosts = open(os.path.expanduser(kh_file))
70
for line in known_hosts.xreadlines():
74
hosts, hostKeyType, encodedKey = split[:3]
75
if host not in hosts.split(','): # incorrect host
77
if hostKeyType != keyType: # incorrect type of key
80
decodedKey = base64.decodestring(encodedKey)
83
if decodedKey == pubKey:
89
class SSHUserAuthClient(userauth.SSHUserAuthClient):
91
def __init__(self, user, options, *args):
92
userauth.SSHUserAuthClient.__init__(self, user, *args)
94
self.options = options
96
if not options.identitys:
97
options.identitys = ['~/.ssh/id_rsa', '~/.ssh/id_dsa']
99
def serviceStarted(self):
100
if 'SSH_AUTH_SOCK' in os.environ and not self.options['noagent']:
101
log.msg('using agent')
102
cc = protocol.ClientCreator(reactor, agent.SSHAgentClient)
103
d = cc.connectUNIX(os.environ['SSH_AUTH_SOCK'])
104
d.addCallback(self._setAgent)
105
d.addErrback(self._ebSetAgent)
107
userauth.SSHUserAuthClient.serviceStarted(self)
109
def serviceStopped(self):
111
self.keyAgent.transport.loseConnection()
114
def _setAgent(self, a):
116
d = self.keyAgent.getPublicKeys()
117
d.addBoth(self._ebSetAgent)
120
def _ebSetAgent(self, f):
121
userauth.SSHUserAuthClient.serviceStarted(self)
123
def _getPassword(self, prompt):
125
oldout, oldin = sys.stdout, sys.stdin
126
sys.stdin = sys.stdout = open('/dev/tty','r+')
127
p=getpass.getpass(prompt)
128
sys.stdout,sys.stdin=oldout,oldin
130
except (KeyboardInterrupt, IOError):
132
raise ConchError('PEBKAC')
134
def getPassword(self, prompt = None):
136
prompt = "%s@%s's password: " % (self.user, self.transport.transport.getPeer().host)
138
p = self._getPassword(prompt)
139
return defer.succeed(p)
143
def getPublicKey(self):
145
blob = self.keyAgent.getPublicKey()
148
files = [x for x in self.options.identitys if x not in self.usedFiles]
149
log.msg(str(self.options.identitys))
155
self.usedFiles.append(file)
156
file = os.path.expanduser(file)
158
if not os.path.exists(file):
159
return self.getPublicKey() # try again
161
return keys.getPublicKeyString(file)
163
return self.getPublicKey() # try again
165
def signData(self, publicKey, signData):
166
if not self.usedFiles: # agent key
167
return self.keyAgent.signData(publicKey, signData)
169
return userauth.SSHUserAuthClient.signData(self, publicKey, signData)
171
def getPrivateKey(self):
172
file = os.path.expanduser(self.usedFiles[-1])
173
if not os.path.exists(file):
176
return defer.succeed(keys.getPrivateKeyObject(file))
177
except keys.BadKeyError, e:
178
if e.args[0] == 'encrypted key with no passphrase':
180
prompt = "Enter passphrase for key '%s': " % \
183
p = self._getPassword(prompt)
184
return defer.succeed(keys.getPrivateKeyObject(file, passphrase = p))
185
except (keys.BadKeyError, ConchError):
187
return defer.fail(ConchError('bad password'))
189
except KeyboardInterrupt:
193
def getGenericAnswers(self, name, instruction, prompts):
196
oldout, oldin = sys.stdout, sys.stdin
197
sys.stdin = sys.stdout = open('/dev/tty','r+')
202
for prompt, echo in prompts:
204
responses.append(raw_input(prompt))
206
responses.append(getpass.getpass(prompt))
208
sys.stdout,sys.stdin=oldout,oldin
209
return defer.succeed(responses)