1
# -*- test-case-name: twisted.conch.test.test_conch -*-
2
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
from twisted.cred import portal
8
from twisted.internet import reactor, defer, protocol
9
from twisted.internet.error import ProcessExitedAlready
10
from twisted.python import log, runtime
11
from twisted.trial import unittest
12
from twisted.conch.error import ConchError
14
from twisted.conch.scripts.conch import SSHSession as StdioInteractingSession
15
except ImportError, e:
16
StdioInteractingSession = None
20
from twisted.conch.test.test_ssh import ConchTestRealm
21
from twisted.python.procutils import which
23
from twisted.conch.test.keydata import publicRSA_openssh, privateRSA_openssh
24
from twisted.conch.test.keydata import publicDSA_openssh, privateDSA_openssh
26
from twisted.conch.test.test_ssh import Crypto, pyasn1
28
from twisted.conch.test.test_ssh import ConchTestServerFactory, \
29
ConchTestPublicKeyChecker
35
class StdioInteractingSessionTests(unittest.TestCase):
37
Tests for L{twisted.conch.scripts.conch.SSHSession}.
39
if StdioInteractingSession is None:
42
def test_eofReceived(self):
44
L{twisted.conch.scripts.conch.SSHSession.eofReceived} loses the
45
write half of its stdio connection.
50
def loseWriteConnection(self):
51
self.writeConnLost = True
54
channel = StdioInteractingSession()
57
self.assertTrue(stdio.writeConnLost)
61
class Echo(protocol.Protocol):
62
def connectionMade(self):
63
log.msg('ECHO CONNECTION MADE')
66
def connectionLost(self, reason):
67
log.msg('ECHO CONNECTION DONE')
70
def dataReceived(self, data):
71
self.transport.write(data)
73
self.transport.loseConnection()
77
class EchoFactory(protocol.Factory):
82
class ConchTestOpenSSHProcess(protocol.ProcessProtocol):
84
Test protocol for launching an OpenSSH client process.
86
@ivar deferred: Set by whatever uses this object. Accessed using
87
L{_getDeferred}, which destroys the value so the Deferred is not
88
fired twice. Fires when the process is terminated.
94
def _getDeferred(self):
95
d, self.deferred = self.deferred, None
99
def outReceived(self, data):
103
def processEnded(self, reason):
105
Called when the process has ended.
107
@param reason: a Failure giving the reason for the process' end.
109
if reason.value.exitCode != 0:
110
self._getDeferred().errback(
111
ConchError("exit code was not 0: %s" %
112
reason.value.exitCode))
114
buf = self.buf.replace('\r\n', '\n')
115
self._getDeferred().callback(buf)
119
class ConchTestForwardingProcess(protocol.ProcessProtocol):
121
Manages a third-party process which launches a server.
123
Uses L{ConchTestForwardingPort} to connect to the third-party server.
124
Once L{ConchTestForwardingPort} has disconnected, kill the process and fire
125
a Deferred with the data received by the L{ConchTestForwardingPort}.
127
@ivar deferred: Set by whatever uses this object. Accessed using
128
L{_getDeferred}, which destroys the value so the Deferred is not
129
fired twice. Fires when the process is terminated.
134
def __init__(self, port, data):
137
@param port: The port on which the third-party server is listening.
138
(it is assumed that the server is running on localhost).
141
@param data: This is sent to the third-party server. Must end with '\n'
142
in order to trigger a disconnect.
149
def _getDeferred(self):
150
d, self.deferred = self.deferred, None
154
def connectionMade(self):
160
Connect to the server, which is often a third-party process.
161
Tries to reconnect if it fails because we have no way of determining
162
exactly when the port becomes available for listening -- we can only
163
know when the process starts.
165
cc = protocol.ClientCreator(reactor, ConchTestForwardingPort, self,
167
d = cc.connectTCP('127.0.0.1', self.port)
168
d.addErrback(self._ebConnect)
172
def _ebConnect(self, f):
173
reactor.callLater(.1, self._connect)
176
def forwardingPortDisconnected(self, buffer):
178
The network connection has died; save the buffer of output
179
from the network and attempt to quit the process gracefully,
180
and then (after the reactor has spun) send it a KILL signal.
183
self.transport.write('\x03')
184
self.transport.loseConnection()
185
reactor.callLater(0, self._reallyDie)
188
def _reallyDie(self):
190
self.transport.signalProcess('KILL')
191
except ProcessExitedAlready:
195
def processEnded(self, reason):
197
Fire the Deferred at self.deferred with the data collected
198
from the L{ConchTestForwardingPort} connection, if any.
200
self._getDeferred().callback(self.buffer)
204
class ConchTestForwardingPort(protocol.Protocol):
206
Connects to server launched by a third-party process (managed by
207
L{ConchTestForwardingProcess}) sends data, then reports whatever it
208
received back to the L{ConchTestForwardingProcess} once the connection
213
def __init__(self, protocol, data):
215
@type protocol: L{ConchTestForwardingProcess}
216
@param protocol: The L{ProcessProtocol} which made this connection.
219
@param data: The data to be sent to the third-party server.
221
self.protocol = protocol
225
def connectionMade(self):
227
self.transport.write(self.data)
230
def dataReceived(self, data):
234
def connectionLost(self, reason):
235
self.protocol.forwardingPortDisconnected(self.buffer)
239
def _makeArgs(args, mod="conch"):
240
start = [sys.executable, '-c'
244
path = os.path.abspath(sys.argv[0])
245
while os.path.dirname(path) != path:
246
if os.path.basename(path).startswith('Twisted'):
247
sys.path.insert(0, path)
249
path = os.path.dirname(path)
251
from twisted.conch.scripts.%s import run
253
return start + list(args)
257
class ForwardingTestBase:
259
Template class for tests of the Conch server's ability to forward arbitrary
262
These tests are integration tests, not unit tests. They launch a Conch
263
server, a custom TCP server (just an L{EchoProtocol}) and then call
266
L{execute} is implemented by subclasses of L{ForwardingTestBase}. It should
267
cause an SSH client to connect to the Conch server, asking it to forward
268
data to the custom TCP server.
272
skip = "can't run w/o PyCrypto"
275
skip = "can't run w/o PyASN1"
277
def _createFiles(self):
278
for f in ['rsa_test','rsa_test.pub','dsa_test','dsa_test.pub',
280
if os.path.exists(f):
282
open('rsa_test','w').write(privateRSA_openssh)
283
open('rsa_test.pub','w').write(publicRSA_openssh)
284
open('dsa_test.pub','w').write(publicDSA_openssh)
285
open('dsa_test','w').write(privateDSA_openssh)
286
os.chmod('dsa_test', 33152)
287
os.chmod('rsa_test', 33152)
288
open('kh_test','w').write('127.0.0.1 '+publicRSA_openssh)
291
def _getFreePort(self):
294
port = s.getsockname()[1]
299
def _makeConchFactory(self):
301
Make a L{ConchTestServerFactory}, which allows us to start a
302
L{ConchTestServer} -- i.e. an actually listening conch.
304
realm = ConchTestRealm()
305
p = portal.Portal(realm)
306
p.registerChecker(ConchTestPublicKeyChecker())
307
factory = ConchTestServerFactory()
314
self.conchFactory = self._makeConchFactory()
315
self.conchFactory.expectedLoseConnection = 1
316
self.conchServer = reactor.listenTCP(0, self.conchFactory,
317
interface="127.0.0.1")
318
self.echoServer = reactor.listenTCP(0, EchoFactory())
319
self.echoPort = self.echoServer.getHost().port
324
self.conchFactory.proto.done = 1
325
except AttributeError:
328
self.conchFactory.proto.transport.loseConnection()
329
return defer.gatherResults([
330
defer.maybeDeferred(self.conchServer.stopListening),
331
defer.maybeDeferred(self.echoServer.stopListening)])
336
Test that we can use whatever client to send the command "echo goodbye"
337
to the Conch server. Make sure we receive "goodbye" back from the
340
d = self.execute('echo goodbye', ConchTestOpenSSHProcess())
341
return d.addCallback(self.assertEquals, 'goodbye\n')
344
def test_localToRemoteForwarding(self):
346
Test that we can use whatever client to forward a local port to a
347
specified port on the server.
349
localPort = self._getFreePort()
350
process = ConchTestForwardingProcess(localPort, 'test\n')
351
d = self.execute('', process,
352
sshArgs='-N -L%i:127.0.0.1:%i'
353
% (localPort, self.echoPort))
354
d.addCallback(self.assertEqual, 'test\n')
358
def test_remoteToLocalForwarding(self):
360
Test that we can use whatever client to forward a port from the server
363
localPort = self._getFreePort()
364
process = ConchTestForwardingProcess(localPort, 'test\n')
365
d = self.execute('', process,
366
sshArgs='-N -R %i:127.0.0.1:%i'
367
% (localPort, self.echoPort))
368
d.addCallback(self.assertEqual, 'test\n')
373
class OpenSSHClientTestCase(ForwardingTestBase, unittest.TestCase):
376
skip = "no ssh command-line client available"
378
def execute(self, remoteCommand, process, sshArgs=''):
380
Connects to the SSH server started in L{ForwardingTestBase.setUp} by
381
running the 'ssh' command line tool.
383
@type remoteCommand: str
384
@param remoteCommand: The command (with arguments) to run on the
387
@type process: L{ConchTestOpenSSHProcess}
390
@param sshArgs: Arguments to pass to the 'ssh' process.
392
@return: L{defer.Deferred}
394
process.deferred = defer.Deferred()
395
cmdline = ('ssh -2 -l testuser -p %i '
396
'-oUserKnownHostsFile=kh_test '
397
'-oPasswordAuthentication=no '
398
# Always use the RSA key, since that's the one in kh_test.
399
'-oHostKeyAlgorithms=ssh-rsa '
401
'-i dsa_test ') + sshArgs + \
402
' 127.0.0.1 ' + remoteCommand
403
port = self.conchServer.getHost().port
404
cmds = (cmdline % port).split()
405
reactor.spawnProcess(process, "ssh", cmds)
406
return process.deferred
410
class CmdLineClientTestCase(ForwardingTestBase, unittest.TestCase):
411
if runtime.platformType == 'win32':
412
skip = "can't run cmdline client on win32"
414
def execute(self, remoteCommand, process, sshArgs=''):
416
As for L{OpenSSHClientTestCase.execute}, except it runs the 'conch'
417
command line tool, not 'ssh'.
419
process.deferred = defer.Deferred()
420
port = self.conchServer.getHost().port
421
cmd = ('-p %i -l testuser '
422
'--known-hosts kh_test '
423
'--user-authentications publickey '
424
'--host-key-algorithms ssh-rsa '
427
'-v ') % port + sshArgs + \
428
' 127.0.0.1 ' + remoteCommand
429
cmds = _makeArgs(cmd.split())
431
env = os.environ.copy()
432
env['PYTHONPATH'] = os.pathsep.join(sys.path)
433
reactor.spawnProcess(process, sys.executable, cmds, env=env)
434
return process.deferred