38
42
return paramiko.AUTH_SUCCESSFUL
39
43
return paramiko.AUTH_FAILED
45
def check_auth_publickey(self, username, key):
46
if (key.get_name() == 'ssh-dss') and (hexlify(key.get_fingerprint()) == '4478f0b9a23cc5182009ff755bc1d26c'):
47
return paramiko.AUTH_SUCCESSFUL
48
return paramiko.AUTH_FAILED
41
50
def check_channel_request(self, kind, chanid):
42
51
return paramiko.OPEN_SUCCEEDED
109
def test_2_auto_add_policy(self):
119
def test_2_client_dsa(self):
121
verify that SSHClient works with a DSA key.
123
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
124
public_host_key = paramiko.RSAKey(data=str(host_key))
126
self.tc = paramiko.SSHClient()
127
self.tc.get_host_keys().add(self.addr, 'ssh-rsa', public_host_key)
128
self.tc.connect(self.addr, self.port, username='slowdive', key_filename='tests/test_dss.key')
131
self.assert_(self.event.isSet())
132
self.assert_(self.ts.is_active())
133
self.assertEquals('slowdive', self.ts.get_username())
134
self.assertEquals(True, self.ts.is_authenticated())
136
stdin, stdout, stderr = self.tc.exec_command('yes')
137
schan = self.ts.accept(1.0)
139
schan.send('Hello there.\n')
140
schan.send_stderr('This is on stderr.\n')
143
self.assertEquals('Hello there.\n', stdout.readline())
144
self.assertEquals('', stdout.readline())
145
self.assertEquals('This is on stderr.\n', stderr.readline())
146
self.assertEquals('', stderr.readline())
152
def test_3_auto_add_policy(self):
111
154
verify that SSHClient's AutoAddPolicy works.
125
168
self.assertEquals(True, self.ts.is_authenticated())
126
169
self.assertEquals(1, len(self.tc.get_host_keys()))
127
170
self.assertEquals(public_host_key, self.tc.get_host_keys()[self.addr]['ssh-rsa'])
172
def test_4_cleanup(self):
174
verify that when an SSHClient is collected, its transport (and the
175
transport's packetizer) is closed.
177
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
178
public_host_key = paramiko.RSAKey(data=str(host_key))
180
self.tc = paramiko.SSHClient()
181
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
182
self.assertEquals(0, len(self.tc.get_host_keys()))
183
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
186
self.assert_(self.event.isSet())
187
self.assert_(self.ts.is_active())
189
p = weakref.ref(self.tc._transport.packetizer)
190
self.assert_(p() is not None)
192
# hrm, sometimes p isn't cleared right away. why is that?
194
while (time.time() - st < 5.0) and (p() is not None):
196
self.assert_(p() is None)