~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/conch/test/test_session.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2007-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for the 'session' channel implementation in twisted.conch.ssh.session.
 
6
 
 
7
See also RFC 4254.
 
8
"""
 
9
 
 
10
import os, signal, sys, struct
 
11
 
 
12
from zope.interface import implements
 
13
 
 
14
from twisted.internet.error import ProcessTerminated, ProcessDone
 
15
from twisted.python.failure import Failure
 
16
from twisted.conch.ssh import common, session, connection
 
17
from twisted.internet import defer, protocol, error
 
18
from twisted.python import components, failure
 
19
from twisted.trial import unittest
 
20
 
 
21
 
 
22
 
 
23
class SubsystemOnlyAvatar(object):
 
24
    """
 
25
    A stub class representing an avatar that is only useful for
 
26
    getting a subsystem.
 
27
    """
 
28
 
 
29
 
 
30
    def lookupSubsystem(self, name, data):
 
31
        """
 
32
        If the other side requests the 'subsystem' subsystem, allow it by
 
33
        returning a MockProtocol to implement it.  Otherwise, return
 
34
        None which is interpreted by SSHSession as a failure.
 
35
        """
 
36
        if name == 'subsystem':
 
37
            return MockProtocol()
 
38
 
 
39
 
 
40
 
 
41
class StubAvatar:
 
42
    """
 
43
    A stub class representing the avatar representing the authenticated user.
 
44
    It implements the I{ISession} interface.
 
45
    """
 
46
 
 
47
 
 
48
    def lookupSubsystem(self, name, data):
 
49
        """
 
50
        If the user requests the TestSubsystem subsystem, connect them to a
 
51
        MockProtocol.  If they request neither, then None is returned which is
 
52
        interpreted by SSHSession as a failure.
 
53
        """
 
54
        if name == 'TestSubsystem':
 
55
            self.subsystem = MockProtocol()
 
56
            self.subsystem.packetData = data
 
57
            return self.subsystem
 
58
 
 
59
 
 
60
 
 
61
class StubSessionForStubAvatar(object):
 
62
    """
 
63
    A stub ISession implementation for our StubAvatar.  The instance
 
64
    variables generally keep track of method invocations so that we can test
 
65
    that the methods were called.
 
66
 
 
67
    @ivar avatar: the L{StubAvatar} we are adapting.
 
68
    @ivar ptyRequest: if present, the terminal, window size, and modes passed
 
69
        to the getPty method.
 
70
    @ivar windowChange: if present, the window size passed to the
 
71
        windowChangned method.
 
72
    @ivar shellProtocol: if present, the L{SSHSessionProcessProtocol} passed
 
73
        to the openShell method.
 
74
    @ivar shellTransport: if present, the L{EchoTransport} connected to
 
75
        shellProtocol.
 
76
    @ivar execProtocol: if present, the L{SSHSessionProcessProtocol} passed
 
77
        to the execCommand method.
 
78
    @ivar execTransport: if present, the L{EchoTransport} connected to
 
79
        execProtocol.
 
80
    @ivar execCommandLine: if present, the command line passed to the
 
81
        execCommand method.
 
82
    @ivar gotEOF: if present, an EOF message was received.
 
83
    @ivar gotClosed: if present, a closed message was received.
 
84
    """
 
85
 
 
86
 
 
87
    implements(session.ISession)
 
88
 
 
89
 
 
90
    def __init__(self, avatar):
 
91
        """
 
92
        Store the avatar we're adapting.
 
93
        """
 
94
        self.avatar = avatar
 
95
        self.shellProtocol = None
 
96
 
 
97
 
 
98
    def getPty(self, terminal, window, modes):
 
99
        """
 
100
        If the terminal is 'bad', fail.  Otherwise, store the information in
 
101
        the ptyRequest variable.
 
102
        """
 
103
        if terminal != 'bad':
 
104
            self.ptyRequest = (terminal, window, modes)
 
105
        else:
 
106
            raise RuntimeError('not getting a pty')
 
107
 
 
108
 
 
109
    def windowChanged(self, window):
 
110
        """
 
111
        If all the window sizes are 0, fail.  Otherwise, store the size in the
 
112
        windowChange variable.
 
113
        """
 
114
        if window == (0, 0, 0, 0):
 
115
            raise RuntimeError('not changing the window size')
 
116
        else:
 
117
            self.windowChange = window
 
118
 
 
119
 
 
120
    def openShell(self, pp):
 
121
        """
 
122
        If we have gotten a shell request before, fail.  Otherwise, store the
 
123
        process protocol in the shellProtocol variable, connect it to the
 
124
        EchoTransport and store that as shellTransport.
 
125
        """
 
126
        if self.shellProtocol is not None:
 
127
            raise RuntimeError('not getting a shell this time')
 
128
        else:
 
129
            self.shellProtocol = pp
 
130
            self.shellTransport = EchoTransport(pp)
 
131
 
 
132
 
 
133
    def execCommand(self, pp, command):
 
134
        """
 
135
        If the command is 'true', store the command, the process protocol, and
 
136
        the transport we connect to the process protocol.  Otherwise, just
 
137
        store the command and raise an error.
 
138
        """
 
139
        self.execCommandLine = command
 
140
        if command == 'success':
 
141
            self.execProtocol = pp
 
142
        elif command[:6] == 'repeat':
 
143
            self.execProtocol = pp
 
144
            self.execTransport = EchoTransport(pp)
 
145
            pp.outReceived(command[7:])
 
146
        else:
 
147
            raise RuntimeError('not getting a command')
 
148
 
 
149
 
 
150
    def eofReceived(self):
 
151
        """
 
152
        Note that EOF has been received.
 
153
        """
 
154
        self.gotEOF = True
 
155
 
 
156
 
 
157
    def closed(self):
 
158
        """
 
159
        Note that close has been received.
 
160
        """
 
161
        self.gotClosed = True
 
162
 
 
163
 
 
164
 
 
165
components.registerAdapter(StubSessionForStubAvatar, StubAvatar,
 
166
        session.ISession)
 
167
 
 
168
 
 
169
 
 
170
 
 
171
class MockProcessProtocol(protocol.ProcessProtocol):
 
172
    """
 
173
    A mock ProcessProtocol which echoes back data sent to it and
 
174
    appends a tilde.  The tilde is appended so the tests can verify that
 
175
    we received and processed the data.
 
176
 
 
177
    @ivar packetData: C{str} of data to be sent when the connection is made.
 
178
    @ivar data: a C{str} of data received.
 
179
    @ivar err: a C{str} of error data received.
 
180
    @ivar inConnectionOpen: True if the input side is open.
 
181
    @ivar outConnectionOpen: True if the output side is open.
 
182
    @ivar errConnectionOpen: True if the error side is open.
 
183
    @ivar ended: False if the protocol has not ended, a C{Failure} if the
 
184
        process has ended.
 
185
    """
 
186
    packetData = ''
 
187
 
 
188
 
 
189
    def connectionMade(self):
 
190
        """
 
191
        Set up variables.
 
192
        """
 
193
        self.data = ''
 
194
        self.err = ''
 
195
        self.inConnectionOpen = True
 
196
        self.outConnectionOpen = True
 
197
        self.errConnectionOpen = True
 
198
        self.ended = False
 
199
        if self.packetData:
 
200
            self.outReceived(self.packetData)
 
201
 
 
202
 
 
203
    def outReceived(self, data):
 
204
        """
 
205
        Data was received.  Store it and echo it back with a tilde.
 
206
        """
 
207
        self.data += data
 
208
        if self.transport is not None:
 
209
            self.transport.write(data + '~')
 
210
 
 
211
 
 
212
    def errReceived(self, data):
 
213
        """
 
214
        Error data was received.  Store it and echo it back backwards.
 
215
        """
 
216
        self.err += data
 
217
        self.transport.write(data[::-1])
 
218
 
 
219
 
 
220
    def inConnectionLost(self):
 
221
        """
 
222
        Close the input side.
 
223
        """
 
224
        self.inConnectionOpen = False
 
225
 
 
226
 
 
227
    def outConnectionLost(self):
 
228
        """
 
229
        Close the output side.
 
230
        """
 
231
        self.outConnectionOpen = False
 
232
 
 
233
 
 
234
    def errConnectionLost(self):
 
235
        """
 
236
        Close the error side.
 
237
        """
 
238
        self.errConnectionOpen = False
 
239
 
 
240
 
 
241
    def processEnded(self, reason):
 
242
        """
 
243
        End the process and store the reason.
 
244
        """
 
245
        self.ended = reason
 
246
 
 
247
 
 
248
 
 
249
class EchoTransport:
 
250
    """
 
251
    A transport for a ProcessProtocol which echos data that is sent to it with
 
252
    a Window newline (CR LF) appended to it.  If a null byte is in the data,
 
253
    disconnect.  When we are asked to disconnect, disconnect the
 
254
    C{ProcessProtocol} with a 0 exit code.
 
255
 
 
256
    @ivar proto: the C{ProcessProtocol} connected to us.
 
257
    @ivar data: a C{str} of data written to us.
 
258
    """
 
259
 
 
260
 
 
261
    def __init__(self, processProtocol):
 
262
        """
 
263
        Initialize our instance variables.
 
264
 
 
265
        @param processProtocol: a C{ProcessProtocol} to connect to ourself.
 
266
        """
 
267
        self.proto = processProtocol
 
268
        self.closed = False
 
269
        self.data = ''
 
270
        processProtocol.makeConnection(self)
 
271
 
 
272
 
 
273
    def write(self, data):
 
274
        """
 
275
        We got some data.  Give it back to our C{ProcessProtocol} with
 
276
        a newline attached.  Disconnect if there's a null byte.
 
277
        """
 
278
        self.data += data
 
279
        self.proto.outReceived(data)
 
280
        self.proto.outReceived('\r\n')
 
281
        if '\x00' in data: # mimic 'exit' for the shell test
 
282
            self.loseConnection()
 
283
 
 
284
 
 
285
    def loseConnection(self):
 
286
        """
 
287
        If we're asked to disconnect (and we haven't already) shut down
 
288
        the C{ProcessProtocol} with a 0 exit code.
 
289
        """
 
290
        if self.closed:
 
291
            return
 
292
        self.closed = 1
 
293
        self.proto.inConnectionLost()
 
294
        self.proto.outConnectionLost()
 
295
        self.proto.errConnectionLost()
 
296
        self.proto.processEnded(failure.Failure(
 
297
                error.ProcessTerminated(0, None, None)))
 
298
 
 
299
 
 
300
 
 
301
class MockProtocol(protocol.Protocol):
 
302
    """
 
303
    A sample Protocol which stores the data passed to it.
 
304
 
 
305
    @ivar packetData: a C{str} of data to be sent when the connection is made.
 
306
    @ivar data: a C{str} of the data passed to us.
 
307
    @ivar open: True if the channel is open.
 
308
    @ivar reason: if not None, the reason the protocol was closed.
 
309
    """
 
310
    packetData = ''
 
311
 
 
312
 
 
313
    def connectionMade(self):
 
314
        """
 
315
        Set up the instance variables.  If we have any packetData, send it
 
316
        along.
 
317
        """
 
318
 
 
319
        self.data = ''
 
320
        self.open = True
 
321
        self.reason = None
 
322
        if self.packetData:
 
323
            self.dataReceived(self.packetData)
 
324
 
 
325
 
 
326
    def dataReceived(self, data):
 
327
        """
 
328
        Store the received data and write it back with a tilde appended.
 
329
        The tilde is appended so that the tests can verify that we processed
 
330
        the data.
 
331
        """
 
332
        self.data += data
 
333
        if self.transport is not None:
 
334
            self.transport.write(data + '~')
 
335
 
 
336
 
 
337
    def connectionLost(self, reason):
 
338
        """
 
339
        Close the protocol and store the reason.
 
340
        """
 
341
        self.open = False
 
342
        self.reason = reason
 
343
 
 
344
 
 
345
 
 
346
class StubConnection(object):
 
347
    """
 
348
    A stub for twisted.conch.ssh.connection.SSHConnection.  Record the data
 
349
    that channels send, and when they try to close the connection.
 
350
 
 
351
    @ivar data: a C{dict} mapping C{SSHChannel}s to a C{list} of C{str} of data
 
352
        they sent.
 
353
    @ivar extData: a C{dict} mapping L{SSHChannel}s to a C{list} of C{tuple} of
 
354
        (C{int}, C{str}) of extended data they sent.
 
355
    @ivar requests: a C{dict} mapping L{SSHChannel}s to a C{list} of C{tuple}
 
356
        of (C{str}, C{str}) of channel requests they made.
 
357
    @ivar eofs: a C{dict} mapping L{SSHChannel}s to C{true} if they have sent
 
358
        an EOF.
 
359
    @ivar closes: a C{dict} mapping L{SSHChannel}s to C{true} if they have sent
 
360
        a close.
 
361
    """
 
362
 
 
363
 
 
364
    def __init__(self):
 
365
        """
 
366
        Initialize our instance variables.
 
367
        """
 
368
        self.data = {}
 
369
        self.extData = {}
 
370
        self.requests = {}
 
371
        self.eofs = {}
 
372
        self.closes = {}
 
373
 
 
374
 
 
375
    def logPrefix(self):
 
376
        """
 
377
        Return our logging prefix.
 
378
        """
 
379
        return "MockConnection"
 
380
 
 
381
 
 
382
    def sendData(self, channel, data):
 
383
        """
 
384
        Record the sent data.
 
385
        """
 
386
        self.data.setdefault(channel, []).append(data)
 
387
 
 
388
 
 
389
    def sendExtendedData(self, channel, type, data):
 
390
        """
 
391
        Record the sent extended data.
 
392
        """
 
393
        self.extData.setdefault(channel, []).append((type, data))
 
394
 
 
395
 
 
396
    def sendRequest(self, channel, request, data, wantReply=False):
 
397
        """
 
398
        Record the sent channel request.
 
399
        """
 
400
        self.requests.setdefault(channel, []).append((request, data,
 
401
            wantReply))
 
402
        if wantReply:
 
403
            return defer.succeed(None)
 
404
 
 
405
 
 
406
    def sendEOF(self, channel):
 
407
        """
 
408
        Record the sent EOF.
 
409
        """
 
410
        self.eofs[channel] = True
 
411
 
 
412
 
 
413
    def sendClose(self, channel):
 
414
        """
 
415
        Record the sent close.
 
416
        """
 
417
        self.closes[channel] = True
 
418
 
 
419
 
 
420
 
 
421
 
 
422
 
 
423
 
 
424
class StubTransport:
 
425
    """
 
426
    A stub transport which records the data written.
 
427
 
 
428
    @ivar buf: the data sent to the transport.
 
429
    @type buf: C{str}
 
430
 
 
431
    @ivar close: flags indicating if the transport has been closed.
 
432
    @type close: C{bool}
 
433
    """
 
434
 
 
435
    buf = ''
 
436
    close = False
 
437
 
 
438
 
 
439
    def write(self, data):
 
440
        """
 
441
        Record data in the buffer.
 
442
        """
 
443
        self.buf += data
 
444
 
 
445
 
 
446
    def loseConnection(self):
 
447
        """
 
448
        Note that the connection was closed.
 
449
        """
 
450
        self.close = True
 
451
 
 
452
 
 
453
class StubTransportWithWriteErr(StubTransport):
 
454
    """
 
455
    A version of StubTransport which records the error data sent to it.
 
456
 
 
457
    @ivar err: the extended data sent to the transport.
 
458
    @type err: C{str}
 
459
    """
 
460
 
 
461
    err = ''
 
462
 
 
463
 
 
464
    def writeErr(self, data):
 
465
        """
 
466
        Record the extended data in the buffer.  This was an old interface
 
467
        that allowed the Transports from ISession.openShell() or
 
468
        ISession.execCommand() to receive extended data from the client.
 
469
        """
 
470
        self.err += data
 
471
 
 
472
 
 
473
 
 
474
class StubClient(object):
 
475
    """
 
476
    A stub class representing the client to a SSHSession.
 
477
 
 
478
    @ivar transport: A L{StubTransport} object which keeps track of the data
 
479
        passed to it.
 
480
    """
 
481
 
 
482
 
 
483
    def __init__(self):
 
484
        self.transport = StubTransportWithWriteErr()
 
485
 
 
486
 
 
487
 
 
488
class SessionInterfaceTestCase(unittest.TestCase):
 
489
    """
 
490
    Tests for the SSHSession class interface.  This interface is not ideal, but
 
491
    it is tested in order to maintain backwards compatibility.
 
492
    """
 
493
 
 
494
 
 
495
    def setUp(self):
 
496
        """
 
497
        Make an SSHSession object to test.  Give the channel some window
 
498
        so that it's allowed to send packets.  500 and 100 are arbitrary
 
499
        values.
 
500
        """
 
501
        self.session = session.SSHSession(remoteWindow=500,
 
502
                remoteMaxPacket=100, conn=StubConnection(),
 
503
                avatar=StubAvatar())
 
504
 
 
505
 
 
506
    def assertSessionIsStubSession(self):
 
507
        """
 
508
        Asserts that self.session.session is an instance of
 
509
        StubSessionForStubOldAvatar.
 
510
        """
 
511
        self.assertIsInstance(self.session.session,
 
512
                              StubSessionForStubAvatar)
 
513
 
 
514
 
 
515
    def test_init(self):
 
516
        """
 
517
        SSHSession initializes its buffer (buf), client, and ISession adapter.
 
518
        The avatar should not need to be adaptable to an ISession immediately.
 
519
        """
 
520
        s = session.SSHSession(avatar=object) # use object because it doesn't
 
521
                                              # have an adapter
 
522
        self.assertEquals(s.buf, '')
 
523
        self.assertIdentical(s.client, None)
 
524
        self.assertIdentical(s.session, None)
 
525
 
 
526
 
 
527
    def test_client_dataReceived(self):
 
528
        """
 
529
        SSHSession.dataReceived() passes data along to a client.  If the data
 
530
        comes before there is a client, the data should be discarded.
 
531
        """
 
532
        self.session.dataReceived('1')
 
533
        self.session.client = StubClient()
 
534
        self.session.dataReceived('2')
 
535
        self.assertEquals(self.session.client.transport.buf, '2')
 
536
 
 
537
    def test_client_extReceived(self):
 
538
        """
 
539
        SSHSession.extReceived() passed data of type EXTENDED_DATA_STDERR along
 
540
        to the client.  If the data comes before there is a client, or if the
 
541
        data is not of type EXTENDED_DATA_STDERR, it is discared.
 
542
        """
 
543
        self.session.extReceived(connection.EXTENDED_DATA_STDERR, '1')
 
544
        self.session.extReceived(255, '2') # 255 is arbitrary
 
545
        self.session.client = StubClient()
 
546
        self.session.extReceived(connection.EXTENDED_DATA_STDERR, '3')
 
547
        self.assertEquals(self.session.client.transport.err, '3')
 
548
 
 
549
 
 
550
    def test_client_extReceivedWithoutWriteErr(self):
 
551
        """
 
552
        SSHSession.extReceived() should handle the case where the transport
 
553
        on the client doesn't have a writeErr method.
 
554
        """
 
555
        client = self.session.client = StubClient()
 
556
        client.transport = StubTransport() # doesn't have writeErr
 
557
 
 
558
        # should not raise an error
 
559
        self.session.extReceived(connection.EXTENDED_DATA_STDERR, 'ignored')
 
560
 
 
561
 
 
562
 
 
563
    def test_client_closed(self):
 
564
        """
 
565
        SSHSession.closed() should tell the transport connected to the client
 
566
        that the connection was lost.
 
567
        """
 
568
        self.session.client = StubClient()
 
569
        self.session.closed()
 
570
        self.assertTrue(self.session.client.transport.close)
 
571
        self.session.client.transport.close = False
 
572
 
 
573
 
 
574
    def test_badSubsystemDoesNotCreateClient(self):
 
575
        """
 
576
        When a subsystem request fails, SSHSession.client should not be set.
 
577
        """
 
578
        ret = self.session.requestReceived(
 
579
            'subsystem', common.NS('BadSubsystem'))
 
580
        self.assertFalse(ret)
 
581
        self.assertIdentical(self.session.client, None)
 
582
 
 
583
 
 
584
    def test_lookupSubsystem(self):
 
585
        """
 
586
        When a client requests a subsystem, the SSHSession object should get
 
587
        the subsystem by calling avatar.lookupSubsystem, and attach it as
 
588
        the client.
 
589
        """
 
590
        ret = self.session.requestReceived(
 
591
            'subsystem', common.NS('TestSubsystem') + 'data')
 
592
        self.assertTrue(ret)
 
593
        self.assertIsInstance(self.session.client, protocol.ProcessProtocol)
 
594
        self.assertIdentical(self.session.client.transport.proto,
 
595
                             self.session.avatar.subsystem)
 
596
 
 
597
 
 
598
 
 
599
    def test_lookupSubsystemDoesNotNeedISession(self):
 
600
        """
 
601
        Previously, if one only wanted to implement a subsystem, an ISession
 
602
        adapter wasn't needed because subsystems were looked up using the
 
603
        lookupSubsystem method on the avatar.
 
604
        """
 
605
        s = session.SSHSession(avatar=SubsystemOnlyAvatar(),
 
606
                               conn=StubConnection())
 
607
        ret = s.request_subsystem(
 
608
            common.NS('subsystem') + 'data')
 
609
        self.assertTrue(ret)
 
610
        self.assertNotIdentical(s.client, None)
 
611
        self.assertIdentical(s.conn.closes.get(s), None)
 
612
        s.eofReceived()
 
613
        self.assertTrue(s.conn.closes.get(s))
 
614
        # these should not raise errors
 
615
        s.loseConnection()
 
616
        s.closed()
 
617
 
 
618
 
 
619
    def test_lookupSubsystem_data(self):
 
620
        """
 
621
        After having looked up a subsystem, data should be passed along to the
 
622
        client.  Additionally, subsystems were passed the entire request packet
 
623
        as data, instead of just the additional data.
 
624
 
 
625
        We check for the additional tidle to verify that the data passed
 
626
        through the client.
 
627
        """
 
628
        #self.session.dataReceived('1')
 
629
        # subsystems didn't get extended data
 
630
        #self.session.extReceived(connection.EXTENDED_DATA_STDERR, '2')
 
631
 
 
632
        self.session.requestReceived('subsystem',
 
633
                                     common.NS('TestSubsystem') + 'data')
 
634
 
 
635
        self.assertEquals(self.session.conn.data[self.session],
 
636
                ['\x00\x00\x00\x0dTestSubsystemdata~'])
 
637
        self.session.dataReceived('more data')
 
638
        self.assertEquals(self.session.conn.data[self.session][-1],
 
639
                'more data~')
 
640
 
 
641
 
 
642
    def test_lookupSubsystem_closeReceived(self):
 
643
        """
 
644
        SSHSession.closeReceived() should sent a close message to the remote
 
645
        side.
 
646
        """
 
647
        self.session.requestReceived('subsystem',
 
648
                                     common.NS('TestSubsystem') + 'data')
 
649
 
 
650
        self.session.closeReceived()
 
651
        self.assertTrue(self.session.conn.closes[self.session])
 
652
 
 
653
 
 
654
    def assertRequestRaisedRuntimeError(self):
 
655
        """
 
656
        Assert that the request we just made raised a RuntimeError (and only a
 
657
        RuntimeError).
 
658
        """
 
659
        errors = self.flushLoggedErrors(RuntimeError)
 
660
        self.assertEquals(len(errors), 1, "Multiple RuntimeErrors raised: %s" %
 
661
                          '\n'.join([repr(error) for error in errors]))
 
662
        errors[0].trap(RuntimeError)
 
663
 
 
664
 
 
665
    def test_requestShell(self):
 
666
        """
 
667
        When a client requests a shell, the SSHSession object should get
 
668
        the shell by getting an ISession adapter for the avatar, then
 
669
        calling openShell() with a ProcessProtocol to attach.
 
670
        """
 
671
        # gets a shell the first time
 
672
        ret = self.session.requestReceived('shell', '')
 
673
        self.assertTrue(ret)
 
674
        self.assertSessionIsStubSession()
 
675
        self.assertIsInstance(self.session.client,
 
676
                              session.SSHSessionProcessProtocol)
 
677
        self.assertIdentical(self.session.session.shellProtocol,
 
678
                self.session.client)
 
679
        # doesn't get a shell the second time
 
680
        self.assertFalse(self.session.requestReceived('shell', ''))
 
681
        self.assertRequestRaisedRuntimeError()
 
682
 
 
683
 
 
684
    def test_requestShellWithData(self):
 
685
        """
 
686
        When a client executes a shell, it should be able to give pass data
 
687
        back and forth between the local and the remote side.
 
688
        """
 
689
        ret = self.session.requestReceived('shell', '')
 
690
        self.assertTrue(ret)
 
691
        self.assertSessionIsStubSession()
 
692
        self.session.dataReceived('some data\x00')
 
693
        self.assertEquals(self.session.session.shellTransport.data,
 
694
                          'some data\x00')
 
695
        self.assertEquals(self.session.conn.data[self.session],
 
696
                          ['some data\x00', '\r\n'])
 
697
        self.assertTrue(self.session.session.shellTransport.closed)
 
698
        self.assertEquals(self.session.conn.requests[self.session],
 
699
                          [('exit-status', '\x00\x00\x00\x00', False)])
 
700
 
 
701
 
 
702
    def test_requestExec(self):
 
703
        """
 
704
        When a client requests a command, the SSHSession object should get
 
705
        the command by getting an ISession adapter for the avatar, then
 
706
        calling execCommand with a ProcessProtocol to attach and the
 
707
        command line.
 
708
        """
 
709
        ret = self.session.requestReceived('exec',
 
710
                                           common.NS('failure'))
 
711
        self.assertFalse(ret)
 
712
        self.assertRequestRaisedRuntimeError()
 
713
        self.assertIdentical(self.session.client, None)
 
714
 
 
715
        self.assertTrue(self.session.requestReceived('exec',
 
716
                                                     common.NS('success')))
 
717
        self.assertSessionIsStubSession()
 
718
        self.assertIsInstance(self.session.client,
 
719
                              session.SSHSessionProcessProtocol)
 
720
        self.assertIdentical(self.session.session.execProtocol,
 
721
                self.session.client)
 
722
        self.assertEquals(self.session.session.execCommandLine,
 
723
                'success')
 
724
 
 
725
 
 
726
    def test_requestExecWithData(self):
 
727
        """
 
728
        When a client executes a command, it should be able to give pass data
 
729
        back and forth.
 
730
        """
 
731
        ret = self.session.requestReceived('exec',
 
732
                                           common.NS('repeat hello'))
 
733
        self.assertTrue(ret)
 
734
        self.assertSessionIsStubSession()
 
735
        self.session.dataReceived('some data')
 
736
        self.assertEquals(self.session.session.execTransport.data, 'some data')
 
737
        self.assertEquals(self.session.conn.data[self.session],
 
738
                          ['hello', 'some data', '\r\n'])
 
739
        self.session.eofReceived()
 
740
        self.session.closeReceived()
 
741
        self.session.closed()
 
742
        self.assertTrue(self.session.session.execTransport.closed)
 
743
        self.assertEquals(self.session.conn.requests[self.session],
 
744
                          [('exit-status', '\x00\x00\x00\x00', False)])
 
745
 
 
746
 
 
747
    def test_requestPty(self):
 
748
        """
 
749
        When a client requests a PTY, the SSHSession object should make
 
750
        the request by getting an ISession adapter for the avatar, then
 
751
        calling getPty with the terminal type, the window size, and any modes
 
752
        the client gave us.
 
753
        """
 
754
        # 'bad' terminal type fails
 
755
        ret = self.session.requestReceived(
 
756
            'pty_req',  session.packRequest_pty_req(
 
757
                'bad', (1, 2, 3, 4), ''))
 
758
        self.assertFalse(ret)
 
759
        self.assertSessionIsStubSession()
 
760
        self.assertRequestRaisedRuntimeError()
 
761
        # 'good' terminal type succeeds
 
762
        self.assertTrue(self.session.requestReceived('pty_req',
 
763
            session.packRequest_pty_req('good', (1, 2, 3, 4), '')))
 
764
        self.assertEquals(self.session.session.ptyRequest,
 
765
                ('good', (1, 2, 3, 4), []))
 
766
 
 
767
 
 
768
    def test_requestWindowChange(self):
 
769
        """
 
770
        When the client requests to change the window size, the SSHSession
 
771
        object should make the request by getting an ISession adapter for the
 
772
        avatar, then calling windowChanged with the new window size.
 
773
        """
 
774
        ret = self.session.requestReceived(
 
775
            'window_change',
 
776
            session.packRequest_window_change((0, 0, 0, 0)))
 
777
        self.assertFalse(ret)
 
778
        self.assertRequestRaisedRuntimeError()
 
779
        self.assertSessionIsStubSession()
 
780
        self.assertTrue(self.session.requestReceived('window_change',
 
781
            session.packRequest_window_change((1, 2, 3, 4))))
 
782
        self.assertEquals(self.session.session.windowChange,
 
783
                (1, 2, 3, 4))
 
784
 
 
785
 
 
786
    def test_eofReceived(self):
 
787
        """
 
788
        When an EOF is received and a ISession adapter is present, it should
 
789
        be notified of the EOF message.
 
790
        """
 
791
        self.session.session = session.ISession(self.session.avatar)
 
792
        self.session.eofReceived()
 
793
        self.assertTrue(self.session.session.gotEOF)
 
794
 
 
795
 
 
796
    def test_closeReceived(self):
 
797
        """
 
798
        When a close is received, the session should send a close message.
 
799
        """
 
800
        ret = self.session.closeReceived()
 
801
        self.assertIdentical(ret, None)
 
802
        self.assertTrue(self.session.conn.closes[self.session])
 
803
 
 
804
 
 
805
    def test_closed(self):
 
806
        """
 
807
        When a close is received and a ISession adapter is present, it should
 
808
        be notified of the close message.
 
809
        """
 
810
        self.session.session = session.ISession(self.session.avatar)
 
811
        self.session.closed()
 
812
        self.assertTrue(self.session.session.gotClosed)
 
813
 
 
814
 
 
815
 
 
816
class SessionWithNoAvatarTestCase(unittest.TestCase):
 
817
    """
 
818
    Test for the SSHSession interface.  Several of the methods (request_shell,
 
819
    request_exec, request_pty_req, request_window_change) would create a
 
820
    'session' instance variable from the avatar if one didn't exist when they
 
821
    were called.
 
822
    """
 
823
 
 
824
 
 
825
    def setUp(self):
 
826
        self.session = session.SSHSession()
 
827
        self.session.avatar = StubAvatar()
 
828
        self.assertIdentical(self.session.session, None)
 
829
 
 
830
 
 
831
    def assertSessionProvidesISession(self):
 
832
        """
 
833
        self.session.session should provide I{ISession}.
 
834
        """
 
835
        self.assertTrue(session.ISession.providedBy(self.session.session),
 
836
                        "ISession not provided by %r" % self.session.session)
 
837
 
 
838
 
 
839
    def test_requestShellGetsSession(self):
 
840
        """
 
841
        If an ISession adapter isn't already present, request_shell should get
 
842
        one.
 
843
        """
 
844
        self.session.requestReceived('shell', '')
 
845
        self.assertSessionProvidesISession()
 
846
 
 
847
 
 
848
    def test_requestExecGetsSession(self):
 
849
        """
 
850
        If an ISession adapter isn't already present, request_exec should get
 
851
        one.
 
852
        """
 
853
        self.session.requestReceived('exec',
 
854
                                     common.NS('success'))
 
855
        self.assertSessionProvidesISession()
 
856
 
 
857
 
 
858
    def test_requestPtyReqGetsSession(self):
 
859
        """
 
860
        If an ISession adapter isn't already present, request_pty_req should
 
861
        get one.
 
862
        """
 
863
        self.session.requestReceived('pty_req',
 
864
                                     session.packRequest_pty_req(
 
865
                'term', (0, 0, 0, 0), ''))
 
866
        self.assertSessionProvidesISession()
 
867
 
 
868
 
 
869
    def test_requestWindowChangeGetsSession(self):
 
870
        """
 
871
        If an ISession adapter isn't already present, request_window_change
 
872
        should get one.
 
873
        """
 
874
        self.session.requestReceived(
 
875
            'window_change',
 
876
            session.packRequest_window_change(
 
877
                (1, 1, 1, 1)))
 
878
        self.assertSessionProvidesISession()
 
879
 
 
880
 
 
881
 
 
882
class WrappersTestCase(unittest.TestCase):
 
883
    """
 
884
    A test for the wrapProtocol and wrapProcessProtocol functions.
 
885
    """
 
886
 
 
887
    def test_wrapProtocol(self):
 
888
        """
 
889
        L{wrapProtocol}, when passed a L{Protocol} should return something that
 
890
        has write(), writeSequence(), loseConnection() methods which call the
 
891
        Protocol's dataReceived() and connectionLost() methods, respectively.
 
892
        """
 
893
        protocol = MockProtocol()
 
894
        protocol.transport = StubTransport()
 
895
        protocol.connectionMade()
 
896
        wrapped = session.wrapProtocol(protocol)
 
897
        wrapped.dataReceived('dataReceived')
 
898
        self.assertEquals(protocol.transport.buf, 'dataReceived')
 
899
        wrapped.write('data')
 
900
        wrapped.writeSequence(['1', '2'])
 
901
        wrapped.loseConnection()
 
902
        self.assertEquals(protocol.data, 'data12')
 
903
        protocol.reason.trap(error.ConnectionDone)
 
904
 
 
905
    def test_wrapProcessProtocol_Protocol(self):
 
906
        """
 
907
        L{wrapPRocessProtocol}, when passed a L{Protocol} should return
 
908
        something that follows the L{IProcessProtocol} interface, with
 
909
        connectionMade() mapping to connectionMade(), outReceived() mapping to
 
910
        dataReceived() and processEnded() mapping to connectionLost().
 
911
        """
 
912
        protocol = MockProtocol()
 
913
        protocol.transport = StubTransport()
 
914
        process_protocol = session.wrapProcessProtocol(protocol)
 
915
        process_protocol.connectionMade()
 
916
        process_protocol.outReceived('data')
 
917
        self.assertEquals(protocol.transport.buf, 'data~')
 
918
        process_protocol.processEnded(failure.Failure(
 
919
            error.ProcessTerminated(0, None, None)))
 
920
        protocol.reason.trap(error.ProcessTerminated)
 
921
 
 
922
 
 
923
 
 
924
class TestHelpers(unittest.TestCase):
 
925
    """
 
926
    Tests for the 4 helper functions: parseRequest_* and packRequest_*.
 
927
    """
 
928
 
 
929
 
 
930
    def test_parseRequest_pty_req(self):
 
931
        """
 
932
        The payload of a pty-req message is::
 
933
            string  terminal
 
934
            uint32  columns
 
935
            uint32  rows
 
936
            uint32  x pixels
 
937
            uint32  y pixels
 
938
            string  modes
 
939
 
 
940
        Modes are::
 
941
            byte    mode number
 
942
            uint32  mode value
 
943
        """
 
944
        self.assertEquals(session.parseRequest_pty_req(common.NS('xterm') +
 
945
                                                       struct.pack('>4L',
 
946
                                                                   1, 2, 3, 4)
 
947
                                                       + common.NS(
 
948
                    struct.pack('>BL', 5, 6))),
 
949
                          ('xterm', (2, 1, 3, 4), [(5, 6)]))
 
950
 
 
951
 
 
952
    def test_packRequest_pty_req_old(self):
 
953
        """
 
954
        See test_parseRequest_pty_req for the payload format.
 
955
        """
 
956
        packed = session.packRequest_pty_req('xterm', (2, 1, 3, 4),
 
957
                                             '\x05\x00\x00\x00\x06')
 
958
 
 
959
        self.assertEquals(packed,
 
960
                          common.NS('xterm') + struct.pack('>4L', 1, 2, 3, 4) +
 
961
                          common.NS(struct.pack('>BL', 5, 6)))
 
962
 
 
963
 
 
964
    def test_packRequest_pty_req(self):
 
965
        """
 
966
        See test_parseRequest_pty_req for the payload format.
 
967
        """
 
968
        packed = session.packRequest_pty_req('xterm', (2, 1, 3, 4),
 
969
                                             '\x05\x00\x00\x00\x06')
 
970
        self.assertEquals(packed,
 
971
                          common.NS('xterm') + struct.pack('>4L', 1, 2, 3, 4) +
 
972
                          common.NS(struct.pack('>BL', 5, 6)))
 
973
 
 
974
 
 
975
    def test_parseRequest_window_change(self):
 
976
        """
 
977
        The payload of a window_change request is::
 
978
            uint32  columns
 
979
            uint32  rows
 
980
            uint32  x pixels
 
981
            uint32  y pixels
 
982
 
 
983
        parseRequest_window_change() returns (rows, columns, x pixels,
 
984
        y pixels).
 
985
        """
 
986
        self.assertEquals(session.parseRequest_window_change(
 
987
                struct.pack('>4L', 1, 2, 3, 4)), (2, 1, 3, 4))
 
988
 
 
989
 
 
990
    def test_packRequest_window_change(self):
 
991
        """
 
992
        See test_parseRequest_window_change for the payload format.
 
993
        """
 
994
        self.assertEquals(session.packRequest_window_change((2, 1, 3, 4)),
 
995
                          struct.pack('>4L', 1, 2, 3, 4))
 
996
 
 
997
 
 
998
 
 
999
class SSHSessionProcessProtocolTestCase(unittest.TestCase):
 
1000
    """
 
1001
    Tests for L{SSHSessionProcessProtocol}.
 
1002
    """
 
1003
 
 
1004
    def setUp(self):
 
1005
        self.session = session.SSHSession(
 
1006
            conn=StubConnection(), remoteWindow=500, remoteMaxPacket=100)
 
1007
        self.transport = StubTransport()
 
1008
        self.pp = session.SSHSessionProcessProtocol(self.session)
 
1009
        self.pp.makeConnection(self.transport)
 
1010
 
 
1011
 
 
1012
    def assertSessionClosed(self):
 
1013
        """
 
1014
        Assert that C{self.session} is closed.
 
1015
        """
 
1016
        self.assertTrue(self.session.conn.closes[self.session])
 
1017
 
 
1018
 
 
1019
    def assertRequestsEqual(self, expectedRequests):
 
1020
        """
 
1021
        Assert that C{self.session} has sent the C{expectedRequests}.
 
1022
        """
 
1023
        self.assertEqual(
 
1024
            self.session.conn.requests[self.session],
 
1025
            expectedRequests)
 
1026
 
 
1027
 
 
1028
    def test_init(self):
 
1029
        """
 
1030
        SSHSessionProcessProtocol should set self.session to the session passed
 
1031
        to the __init__ method.
 
1032
        """
 
1033
        self.assertEquals(self.pp.session, self.session)
 
1034
 
 
1035
 
 
1036
    def test_connectionMade(self):
 
1037
        """
 
1038
        SSHSessionProcessProtocol.connectionMade() should check if there's a
 
1039
        'buf' attribute on its session and write it to the transport if so.
 
1040
        """
 
1041
        self.session.buf = 'buffer'
 
1042
        self.pp.connectionMade()
 
1043
        self.assertEquals(self.transport.buf, 'buffer')
 
1044
 
 
1045
 
 
1046
    def test_getSignalName(self):
 
1047
        """
 
1048
        _getSignalName should return the name of a signal when given the
 
1049
        signal number.
 
1050
        """
 
1051
        for signalName in session.SUPPORTED_SIGNALS:
 
1052
            signalName = 'SIG' + signalName
 
1053
            signalValue = getattr(signal, signalName)
 
1054
            sshName = self.pp._getSignalName(signalValue)
 
1055
            self.assertEquals(sshName, signalName,
 
1056
                              "%i: %s != %s" % (signalValue, sshName,
 
1057
                                                signalName))
 
1058
 
 
1059
 
 
1060
    def test_getSignalNameWithLocalSignal(self):
 
1061
        """
 
1062
        If there are signals in the signal module which aren't in the SSH RFC,
 
1063
        we map their name to [signal name]@[platform].
 
1064
        """
 
1065
        signal.SIGTwistedTest = signal.NSIG + 1 # value can't exist normally
 
1066
        # Force reinitialization of signals
 
1067
        self.pp._signalValuesToNames = None
 
1068
        self.assertEquals(self.pp._getSignalName(signal.SIGTwistedTest),
 
1069
                          'SIGTwistedTest@' + sys.platform)
 
1070
 
 
1071
 
 
1072
    if getattr(signal, 'SIGALRM', None) is None:
 
1073
        test_getSignalName.skip = test_getSignalNameWithLocalSignal.skip = \
 
1074
            "Not all signals available"
 
1075
 
 
1076
 
 
1077
    def test_outReceived(self):
 
1078
        """
 
1079
        When data is passed to the outReceived method, it should be sent to
 
1080
        the session's write method.
 
1081
        """
 
1082
        self.pp.outReceived('test data')
 
1083
        self.assertEquals(self.session.conn.data[self.session],
 
1084
                ['test data'])
 
1085
 
 
1086
 
 
1087
    def test_write(self):
 
1088
        """
 
1089
        When data is passed to the write method, it should be sent to the
 
1090
        session channel's write method.
 
1091
        """
 
1092
        self.pp.write('test data')
 
1093
        self.assertEquals(self.session.conn.data[self.session],
 
1094
                ['test data'])
 
1095
 
 
1096
    def test_writeSequence(self):
 
1097
        """
 
1098
        When a sequence is passed to the writeSequence method, it should be
 
1099
        joined together and sent to the session channel's write method.
 
1100
        """
 
1101
        self.pp.writeSequence(['test ', 'data'])
 
1102
        self.assertEquals(self.session.conn.data[self.session],
 
1103
                ['test data'])
 
1104
 
 
1105
 
 
1106
    def test_errReceived(self):
 
1107
        """
 
1108
        When data is passed to the errReceived method, it should be sent to
 
1109
        the session's writeExtended method.
 
1110
        """
 
1111
        self.pp.errReceived('test data')
 
1112
        self.assertEquals(self.session.conn.extData[self.session],
 
1113
                [(1, 'test data')])
 
1114
 
 
1115
 
 
1116
    def test_inConnectionLost(self):
 
1117
        """
 
1118
        When inConnectionLost is called, it should send an EOF message,
 
1119
        """
 
1120
        self.pp.inConnectionLost()
 
1121
        self.assertTrue(self.session.conn.eofs[self.session])
 
1122
 
 
1123
 
 
1124
    def test_loseConnection(self):
 
1125
        """
 
1126
        When loseConnection() is called, it should call loseConnection
 
1127
        on the session channel.
 
1128
        """
 
1129
        self.pp.loseConnection()
 
1130
        self.assertTrue(self.session.conn.closes[self.session])
 
1131
 
 
1132
 
 
1133
    def test_connectionLost(self):
 
1134
        """
 
1135
        When connectionLost() is called, it should call loseConnection()
 
1136
        on the session channel.
 
1137
        """
 
1138
        self.pp.connectionLost(failure.Failure(
 
1139
                ProcessDone(0)))
 
1140
 
 
1141
 
 
1142
    def test_processEndedWithExitCode(self):
 
1143
        """
 
1144
        When processEnded is called, if there is an exit code in the reason
 
1145
        it should be sent in an exit-status method.  The connection should be
 
1146
        closed.
 
1147
        """
 
1148
        self.pp.processEnded(Failure(ProcessDone(None)))
 
1149
        self.assertRequestsEqual(
 
1150
            [('exit-status', struct.pack('>I', 0) , False)])
 
1151
        self.assertSessionClosed()
 
1152
 
 
1153
 
 
1154
    def test_processEndedWithExitSignalCoreDump(self):
 
1155
        """
 
1156
        When processEnded is called, if there is an exit signal in the reason
 
1157
        it should be sent in an exit-signal message.  The connection should be
 
1158
        closed.
 
1159
        """
 
1160
        self.pp.processEnded(
 
1161
            Failure(ProcessTerminated(1,
 
1162
                signal.SIGTERM, 1 << 7))) # 7th bit means core dumped
 
1163
        self.assertRequestsEqual(
 
1164
            [('exit-signal',
 
1165
              common.NS('TERM') # signal name
 
1166
              + '\x01' # core dumped is true
 
1167
              + common.NS('') # error message
 
1168
              + common.NS(''), # language tag
 
1169
              False)])
 
1170
        self.assertSessionClosed()
 
1171
 
 
1172
 
 
1173
    def test_processEndedWithExitSignalNoCoreDump(self):
 
1174
        """
 
1175
        When processEnded is called, if there is an exit signal in the
 
1176
        reason it should be sent in an exit-signal message.  If no
 
1177
        core was dumped, don't set the core-dump bit.
 
1178
        """
 
1179
        self.pp.processEnded(
 
1180
            Failure(ProcessTerminated(1, signal.SIGTERM, 0)))
 
1181
        # see comments in test_processEndedWithExitSignalCoreDump for the
 
1182
        # meaning of the parts in the request
 
1183
        self.assertRequestsEqual(
 
1184
             [('exit-signal', common.NS('TERM') + '\x00' + common.NS('') +
 
1185
               common.NS(''), False)])
 
1186
        self.assertSessionClosed()
 
1187
 
 
1188
 
 
1189
    if getattr(os, 'WCOREDUMP', None) is None:
 
1190
        skipMsg = "can't run this w/o os.WCOREDUMP"
 
1191
        test_processEndedWithExitSignalCoreDump.skip = skipMsg
 
1192
        test_processEndedWithExitSignalNoCoreDump.skip = skipMsg
 
1193
 
 
1194
 
 
1195
 
 
1196
class SSHSessionClientTestCase(unittest.TestCase):
 
1197
    """
 
1198
    SSHSessionClient is an obsolete class used to connect standard IO to
 
1199
    an SSHSession.
 
1200
    """
 
1201
 
 
1202
 
 
1203
    def test_dataReceived(self):
 
1204
        """
 
1205
        When data is received, it should be sent to the transport.
 
1206
        """
 
1207
        client = session.SSHSessionClient()
 
1208
        client.transport = StubTransport()
 
1209
        client.dataReceived('test data')
 
1210
        self.assertEquals(client.transport.buf, 'test data')