~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/conch/test/test_insults.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.conch.test.test_insults -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
from twisted.trial import unittest
 
6
from twisted.test.proto_helpers import StringTransport
 
7
 
 
8
from twisted.conch.insults.insults import ServerProtocol, ClientProtocol
 
9
from twisted.conch.insults.insults import CS_UK, CS_US, CS_DRAWING, CS_ALTERNATE, CS_ALTERNATE_SPECIAL
 
10
from twisted.conch.insults.insults import G0, G1
 
11
from twisted.conch.insults.insults import modes
 
12
 
 
13
def _getattr(mock, name):
 
14
    return super(Mock, mock).__getattribute__(name)
 
15
 
 
16
def occurrences(mock):
 
17
    return _getattr(mock, 'occurrences')
 
18
 
 
19
def methods(mock):
 
20
    return _getattr(mock, 'methods')
 
21
 
 
22
def _append(mock, obj):
 
23
    occurrences(mock).append(obj)
 
24
 
 
25
default = object()
 
26
 
 
27
class Mock(object):
 
28
    callReturnValue = default
 
29
 
 
30
    def __init__(self, methods=None, callReturnValue=default):
 
31
        """
 
32
        @param methods: Mapping of names to return values
 
33
        @param callReturnValue: object __call__ should return
 
34
        """
 
35
        self.occurrences = []
 
36
        if methods is None:
 
37
            methods = {}
 
38
        self.methods = methods
 
39
        if callReturnValue is not default:
 
40
            self.callReturnValue = callReturnValue
 
41
 
 
42
    def __call__(self, *a, **kw):
 
43
        returnValue = _getattr(self, 'callReturnValue')
 
44
        if returnValue is default:
 
45
            returnValue = Mock()
 
46
        # _getattr(self, 'occurrences').append(('__call__', returnValue, a, kw))
 
47
        _append(self, ('__call__', returnValue, a, kw))
 
48
        return returnValue
 
49
 
 
50
    def __getattribute__(self, name):
 
51
        methods = _getattr(self, 'methods')
 
52
        if name in methods:
 
53
            attrValue = Mock(callReturnValue=methods[name])
 
54
        else:
 
55
            attrValue = Mock()
 
56
        # _getattr(self, 'occurrences').append((name, attrValue))
 
57
        _append(self, (name, attrValue))
 
58
        return attrValue
 
59
 
 
60
class MockMixin:
 
61
    def assertCall(self, occurrence, methodName, expectedPositionalArgs=(),
 
62
                   expectedKeywordArgs={}):
 
63
        attr, mock = occurrence
 
64
        self.assertEquals(attr, methodName)
 
65
        self.assertEquals(len(occurrences(mock)), 1)
 
66
        [(call, result, args, kw)] = occurrences(mock)
 
67
        self.assertEquals(call, "__call__")
 
68
        self.assertEquals(args, expectedPositionalArgs)
 
69
        self.assertEquals(kw, expectedKeywordArgs)
 
70
        return result
 
71
 
 
72
 
 
73
_byteGroupingTestTemplate = """\
 
74
def testByte%(groupName)s(self):
 
75
    transport = StringTransport()
 
76
    proto = Mock()
 
77
    parser = self.protocolFactory(lambda: proto)
 
78
    parser.factory = self
 
79
    parser.makeConnection(transport)
 
80
 
 
81
    bytes = self.TEST_BYTES
 
82
    while bytes:
 
83
        chunk = bytes[:%(bytesPer)d]
 
84
        bytes = bytes[%(bytesPer)d:]
 
85
        parser.dataReceived(chunk)
 
86
 
 
87
    self.verifyResults(transport, proto, parser)
 
88
"""
 
89
class ByteGroupingsMixin(MockMixin):
 
90
    protocolFactory = None
 
91
 
 
92
    for word, n in [('Pairs', 2), ('Triples', 3), ('Quads', 4), ('Quints', 5), ('Sexes', 6)]:
 
93
        exec _byteGroupingTestTemplate % {'groupName': word, 'bytesPer': n}
 
94
    del word, n
 
95
 
 
96
    def verifyResults(self, transport, proto, parser):
 
97
        result = self.assertCall(occurrences(proto).pop(0), "makeConnection", (parser,))
 
98
        self.assertEquals(occurrences(result), [])
 
99
 
 
100
del _byteGroupingTestTemplate
 
101
 
 
102
class ServerArrowKeys(ByteGroupingsMixin, unittest.TestCase):
 
103
    protocolFactory = ServerProtocol
 
104
 
 
105
    # All the arrow keys once
 
106
    TEST_BYTES = '\x1b[A\x1b[B\x1b[C\x1b[D'
 
107
 
 
108
    def verifyResults(self, transport, proto, parser):
 
109
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)
 
110
 
 
111
        for arrow in (parser.UP_ARROW, parser.DOWN_ARROW,
 
112
                      parser.RIGHT_ARROW, parser.LEFT_ARROW):
 
113
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (arrow, None))
 
114
            self.assertEquals(occurrences(result), [])
 
115
        self.failIf(occurrences(proto))
 
116
 
 
117
 
 
118
class PrintableCharacters(ByteGroupingsMixin, unittest.TestCase):
 
119
    protocolFactory = ServerProtocol
 
120
 
 
121
    # Some letters and digits, first on their own, then capitalized,
 
122
    # then modified with alt
 
123
 
 
124
    TEST_BYTES = 'abc123ABC!@#\x1ba\x1bb\x1bc\x1b1\x1b2\x1b3'
 
125
 
 
126
    def verifyResults(self, transport, proto, parser):
 
127
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)
 
128
 
 
129
        for char in 'abc123ABC!@#':
 
130
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (char, None))
 
131
            self.assertEquals(occurrences(result), [])
 
132
 
 
133
        for char in 'abc123':
 
134
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (char, parser.ALT))
 
135
            self.assertEquals(occurrences(result), [])
 
136
 
 
137
        occs = occurrences(proto)
 
138
        self.failIf(occs, "%r should have been []" % (occs,))
 
139
 
 
140
class ServerFunctionKeys(ByteGroupingsMixin, unittest.TestCase):
 
141
    """Test for parsing and dispatching function keys (F1 - F12)
 
142
    """
 
143
    protocolFactory = ServerProtocol
 
144
 
 
145
    byteList = []
 
146
    for bytes in ('OP', 'OQ', 'OR', 'OS', # F1 - F4
 
147
                  '15~', '17~', '18~', '19~', # F5 - F8
 
148
                  '20~', '21~', '23~', '24~'): # F9 - F12
 
149
        byteList.append('\x1b[' + bytes)
 
150
    TEST_BYTES = ''.join(byteList)
 
151
    del byteList, bytes
 
152
 
 
153
    def verifyResults(self, transport, proto, parser):
 
154
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)
 
155
        for funcNum in range(1, 13):
 
156
            funcArg = getattr(parser, 'F%d' % (funcNum,))
 
157
            result = self.assertCall(occurrences(proto).pop(0), "keystrokeReceived", (funcArg, None))
 
158
            self.assertEquals(occurrences(result), [])
 
159
        self.failIf(occurrences(proto))
 
160
 
 
161
class ClientCursorMovement(ByteGroupingsMixin, unittest.TestCase):
 
162
    protocolFactory = ClientProtocol
 
163
 
 
164
    d2 = "\x1b[2B"
 
165
    r4 = "\x1b[4C"
 
166
    u1 = "\x1b[A"
 
167
    l2 = "\x1b[2D"
 
168
    # Move the cursor down two, right four, up one, left two, up one, left two
 
169
    TEST_BYTES = d2 + r4 + u1 + l2 + u1 + l2
 
170
    del d2, r4, u1, l2
 
171
 
 
172
    def verifyResults(self, transport, proto, parser):
 
173
        ByteGroupingsMixin.verifyResults(self, transport, proto, parser)
 
174
 
 
175
        for (method, count) in [('Down', 2), ('Forward', 4), ('Up', 1),
 
176
                                ('Backward', 2), ('Up', 1), ('Backward', 2)]:
 
177
            result = self.assertCall(occurrences(proto).pop(0), "cursor" + method, (count,))
 
178
            self.assertEquals(occurrences(result), [])
 
179
        self.failIf(occurrences(proto))
 
180
 
 
181
class ClientControlSequences(unittest.TestCase, MockMixin):
 
182
    def setUp(self):
 
183
        self.transport = StringTransport()
 
184
        self.proto = Mock()
 
185
        self.parser = ClientProtocol(lambda: self.proto)
 
186
        self.parser.factory = self
 
187
        self.parser.makeConnection(self.transport)
 
188
        result = self.assertCall(occurrences(self.proto).pop(0), "makeConnection", (self.parser,))
 
189
        self.failIf(occurrences(result))
 
190
 
 
191
    def testSimpleCardinals(self):
 
192
        self.parser.dataReceived(
 
193
            ''.join([''.join(['\x1b[' + str(n) + ch for n in ('', 2, 20, 200)]) for ch in 'BACD']))
 
194
        occs = occurrences(self.proto)
 
195
 
 
196
        for meth in ("Down", "Up", "Forward", "Backward"):
 
197
            for count in (1, 2, 20, 200):
 
198
                result = self.assertCall(occs.pop(0), "cursor" + meth, (count,))
 
199
                self.failIf(occurrences(result))
 
200
        self.failIf(occs)
 
201
 
 
202
    def testScrollRegion(self):
 
203
        self.parser.dataReceived('\x1b[5;22r\x1b[r')
 
204
        occs = occurrences(self.proto)
 
205
 
 
206
        result = self.assertCall(occs.pop(0), "setScrollRegion", (5, 22))
 
207
        self.failIf(occurrences(result))
 
208
 
 
209
        result = self.assertCall(occs.pop(0), "setScrollRegion", (None, None))
 
210
        self.failIf(occurrences(result))
 
211
        self.failIf(occs)
 
212
 
 
213
    def testHeightAndWidth(self):
 
214
        self.parser.dataReceived("\x1b#3\x1b#4\x1b#5\x1b#6")
 
215
        occs = occurrences(self.proto)
 
216
 
 
217
        result = self.assertCall(occs.pop(0), "doubleHeightLine", (True,))
 
218
        self.failIf(occurrences(result))
 
219
 
 
220
        result = self.assertCall(occs.pop(0), "doubleHeightLine", (False,))
 
221
        self.failIf(occurrences(result))
 
222
 
 
223
        result = self.assertCall(occs.pop(0), "singleWidthLine")
 
224
        self.failIf(occurrences(result))
 
225
 
 
226
        result = self.assertCall(occs.pop(0), "doubleWidthLine")
 
227
        self.failIf(occurrences(result))
 
228
        self.failIf(occs)
 
229
 
 
230
    def testCharacterSet(self):
 
231
        self.parser.dataReceived(
 
232
            ''.join([''.join(['\x1b' + g + n for n in 'AB012']) for g in '()']))
 
233
        occs = occurrences(self.proto)
 
234
 
 
235
        for which in (G0, G1):
 
236
            for charset in (CS_UK, CS_US, CS_DRAWING, CS_ALTERNATE, CS_ALTERNATE_SPECIAL):
 
237
                result = self.assertCall(occs.pop(0), "selectCharacterSet", (charset, which))
 
238
                self.failIf(occurrences(result))
 
239
        self.failIf(occs)
 
240
 
 
241
    def testShifting(self):
 
242
        self.parser.dataReceived("\x15\x14")
 
243
        occs = occurrences(self.proto)
 
244
 
 
245
        result = self.assertCall(occs.pop(0), "shiftIn")
 
246
        self.failIf(occurrences(result))
 
247
 
 
248
        result = self.assertCall(occs.pop(0), "shiftOut")
 
249
        self.failIf(occurrences(result))
 
250
        self.failIf(occs)
 
251
 
 
252
    def testSingleShifts(self):
 
253
        self.parser.dataReceived("\x1bN\x1bO")
 
254
        occs = occurrences(self.proto)
 
255
 
 
256
        result = self.assertCall(occs.pop(0), "singleShift2")
 
257
        self.failIf(occurrences(result))
 
258
 
 
259
        result = self.assertCall(occs.pop(0), "singleShift3")
 
260
        self.failIf(occurrences(result))
 
261
        self.failIf(occs)
 
262
 
 
263
    def testKeypadMode(self):
 
264
        self.parser.dataReceived("\x1b=\x1b>")
 
265
        occs = occurrences(self.proto)
 
266
 
 
267
        result = self.assertCall(occs.pop(0), "applicationKeypadMode")
 
268
        self.failIf(occurrences(result))
 
269
 
 
270
        result = self.assertCall(occs.pop(0), "numericKeypadMode")
 
271
        self.failIf(occurrences(result))
 
272
        self.failIf(occs)
 
273
 
 
274
    def testCursor(self):
 
275
        self.parser.dataReceived("\x1b7\x1b8")
 
276
        occs = occurrences(self.proto)
 
277
 
 
278
        result = self.assertCall(occs.pop(0), "saveCursor")
 
279
        self.failIf(occurrences(result))
 
280
 
 
281
        result = self.assertCall(occs.pop(0), "restoreCursor")
 
282
        self.failIf(occurrences(result))
 
283
        self.failIf(occs)
 
284
 
 
285
    def testReset(self):
 
286
        self.parser.dataReceived("\x1bc")
 
287
        occs = occurrences(self.proto)
 
288
 
 
289
        result = self.assertCall(occs.pop(0), "reset")
 
290
        self.failIf(occurrences(result))
 
291
        self.failIf(occs)
 
292
 
 
293
    def testIndex(self):
 
294
        self.parser.dataReceived("\x1bD\x1bM\x1bE")
 
295
        occs = occurrences(self.proto)
 
296
 
 
297
        result = self.assertCall(occs.pop(0), "index")
 
298
        self.failIf(occurrences(result))
 
299
 
 
300
        result = self.assertCall(occs.pop(0), "reverseIndex")
 
301
        self.failIf(occurrences(result))
 
302
 
 
303
        result = self.assertCall(occs.pop(0), "nextLine")
 
304
        self.failIf(occurrences(result))
 
305
        self.failIf(occs)
 
306
 
 
307
    def testModes(self):
 
308
        self.parser.dataReceived(
 
309
            "\x1b[" + ';'.join(map(str, [modes.KAM, modes.IRM, modes.LNM])) + "h")
 
310
        self.parser.dataReceived(
 
311
            "\x1b[" + ';'.join(map(str, [modes.KAM, modes.IRM, modes.LNM])) + "l")
 
312
        occs = occurrences(self.proto)
 
313
 
 
314
        result = self.assertCall(occs.pop(0), "setModes", ([modes.KAM, modes.IRM, modes.LNM],))
 
315
        self.failIf(occurrences(result))
 
316
 
 
317
        result = self.assertCall(occs.pop(0), "resetModes", ([modes.KAM, modes.IRM, modes.LNM],))
 
318
        self.failIf(occurrences(result))
 
319
        self.failIf(occs)
 
320
 
 
321
    def testErasure(self):
 
322
        self.parser.dataReceived(
 
323
            "\x1b[K\x1b[1K\x1b[2K\x1b[J\x1b[1J\x1b[2J\x1b[3P")
 
324
        occs = occurrences(self.proto)
 
325
 
 
326
        for meth in ("eraseToLineEnd", "eraseToLineBeginning", "eraseLine",
 
327
                     "eraseToDisplayEnd", "eraseToDisplayBeginning",
 
328
                     "eraseDisplay"):
 
329
            result = self.assertCall(occs.pop(0), meth)
 
330
            self.failIf(occurrences(result))
 
331
 
 
332
        result = self.assertCall(occs.pop(0), "deleteCharacter", (3,))
 
333
        self.failIf(occurrences(result))
 
334
        self.failIf(occs)
 
335
 
 
336
    def testLineDeletion(self):
 
337
        self.parser.dataReceived("\x1b[M\x1b[3M")
 
338
        occs = occurrences(self.proto)
 
339
 
 
340
        for arg in (1, 3):
 
341
            result = self.assertCall(occs.pop(0), "deleteLine", (arg,))
 
342
            self.failIf(occurrences(result))
 
343
        self.failIf(occs)
 
344
 
 
345
    def testLineInsertion(self):
 
346
        self.parser.dataReceived("\x1b[L\x1b[3L")
 
347
        occs = occurrences(self.proto)
 
348
 
 
349
        for arg in (1, 3):
 
350
            result = self.assertCall(occs.pop(0), "insertLine", (arg,))
 
351
            self.failIf(occurrences(result))
 
352
        self.failIf(occs)
 
353
 
 
354
    def testCursorPosition(self):
 
355
        methods(self.proto)['reportCursorPosition'] = (6, 7)
 
356
        self.parser.dataReceived("\x1b[6n")
 
357
        self.assertEquals(self.transport.value(), "\x1b[7;8R")
 
358
        occs = occurrences(self.proto)
 
359
 
 
360
        result = self.assertCall(occs.pop(0), "reportCursorPosition")
 
361
        # This isn't really an interesting assert, since it only tests that
 
362
        # our mock setup is working right, but I'll include it anyway.
 
363
        self.assertEquals(result, (6, 7))
 
364
 
 
365
 
 
366
    def test_applicationDataBytes(self):
 
367
        """
 
368
        Contiguous non-control bytes are passed to a single call to the
 
369
        C{write} method of the terminal to which the L{ClientProtocol} is
 
370
        connected.
 
371
        """
 
372
        occs = occurrences(self.proto)
 
373
        self.parser.dataReceived('a')
 
374
        self.assertCall(occs.pop(0), "write", ("a",))
 
375
        self.parser.dataReceived('bc')
 
376
        self.assertCall(occs.pop(0), "write", ("bc",))
 
377
 
 
378
 
 
379
    def _applicationDataTest(self, data, calls):
 
380
        occs = occurrences(self.proto)
 
381
        self.parser.dataReceived(data)
 
382
        while calls:
 
383
            self.assertCall(occs.pop(0), *calls.pop(0))
 
384
        self.assertFalse(occs, "No other calls should happen: %r" % (occs,))
 
385
 
 
386
 
 
387
    def test_shiftInAfterApplicationData(self):
 
388
        """
 
389
        Application data bytes followed by a shift-in command are passed to a
 
390
        call to C{write} before the terminal's C{shiftIn} method is called.
 
391
        """
 
392
        self._applicationDataTest(
 
393
            'ab\x15', [
 
394
                ("write", ("ab",)),
 
395
                ("shiftIn",)])
 
396
 
 
397
 
 
398
    def test_shiftOutAfterApplicationData(self):
 
399
        """
 
400
        Application data bytes followed by a shift-out command are passed to a
 
401
        call to C{write} before the terminal's C{shiftOut} method is called.
 
402
        """
 
403
        self._applicationDataTest(
 
404
            'ab\x14', [
 
405
                ("write", ("ab",)),
 
406
                ("shiftOut",)])
 
407
 
 
408
 
 
409
    def test_cursorBackwardAfterApplicationData(self):
 
410
        """
 
411
        Application data bytes followed by a cursor-backward command are passed
 
412
        to a call to C{write} before the terminal's C{cursorBackward} method is
 
413
        called.
 
414
        """
 
415
        self._applicationDataTest(
 
416
            'ab\x08', [
 
417
                ("write", ("ab",)),
 
418
                ("cursorBackward",)])
 
419
 
 
420
 
 
421
    def test_escapeAfterApplicationData(self):
 
422
        """
 
423
        Application data bytes followed by an escape character are passed to a
 
424
        call to C{write} before the terminal's handler method for the escape is
 
425
        called.
 
426
        """
 
427
        # Test a short escape
 
428
        self._applicationDataTest(
 
429
            'ab\x1bD', [
 
430
                ("write", ("ab",)),
 
431
                ("index",)])
 
432
 
 
433
        # And a long escape
 
434
        self._applicationDataTest(
 
435
            'ab\x1b[4h', [
 
436
                ("write", ("ab",)),
 
437
                ("setModes", ([4],))])
 
438
 
 
439
        # There's some other cases too, but they're all handled by the same
 
440
        # codepaths as above.
 
441
 
 
442
 
 
443
 
 
444
class ServerProtocolOutputTests(unittest.TestCase):
 
445
    """
 
446
    Tests for the bytes L{ServerProtocol} writes to its transport when its
 
447
    methods are called.
 
448
    """
 
449
    def test_nextLine(self):
 
450
        """
 
451
        L{ServerProtocol.nextLine} writes C{"\r\n"} to its transport.
 
452
        """
 
453
        # Why doesn't it write ESC E?  Because ESC E is poorly supported.  For
 
454
        # example, gnome-terminal (many different versions) fails to scroll if
 
455
        # it receives ESC E and the cursor is already on the last row.
 
456
        protocol = ServerProtocol()
 
457
        transport = StringTransport()
 
458
        protocol.makeConnection(transport)
 
459
        protocol.nextLine()
 
460
        self.assertEqual(transport.value(), "\r\n")