1
# -*- test-case-name: twisted.conch.test.test_insults -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
VT102 and VT220 terminal manipulation.
11
from zope.interface import implements, Interface
13
from twisted.internet import protocol, defer, interfaces as iinternet
15
class ITerminalProtocol(Interface):
16
def makeConnection(transport):
17
"""Called with an L{ITerminalTransport} when a connection is established.
20
def keystrokeReceived(keyID, modifier):
21
"""A keystroke was received.
23
Each keystroke corresponds to one invocation of this method.
24
keyID is a string identifier for that key. Printable characters
25
are represented by themselves. Control keys, such as arrows and
26
function keys, are represented with symbolic constants on
30
def terminalSize(width, height):
31
"""Called to indicate the size of the terminal.
33
A terminal of 80x24 should be assumed if this method is not
34
called. This method might not be called for real terminals.
37
def unhandledControlSequence(seq):
38
"""Called when an unsupported control sequence is received.
41
@param seq: The whole control sequence which could not be interpreted.
44
def connectionLost(reason):
45
"""Called when the connection has been lost.
47
reason is a Failure describing why.
50
class TerminalProtocol(object):
51
implements(ITerminalProtocol)
53
def makeConnection(self, terminal):
54
# assert ITerminalTransport.providedBy(transport), "TerminalProtocol.makeConnection must be passed an ITerminalTransport implementor"
55
self.terminal = terminal
58
def connectionMade(self):
59
"""Called after a connection has been established.
62
def keystrokeReceived(self, keyID, modifier):
65
def terminalSize(self, width, height):
68
def unhandledControlSequence(self, seq):
71
def connectionLost(self, reason):
74
class ITerminalTransport(iinternet.ITransport):
76
"""Move the cursor up n lines.
80
"""Move the cursor down n lines.
83
def cursorForward(n=1):
84
"""Move the cursor right n columns.
87
def cursorBackward(n=1):
88
"""Move the cursor left n columns.
91
def cursorPosition(column, line):
92
"""Move the cursor to the given line and column.
96
"""Move the cursor home.
100
"""Move the cursor down one line, performing scrolling if necessary.
104
"""Move the cursor up one line, performing scrolling if necessary.
108
"""Move the cursor to the first position on the next line, performing scrolling if necessary.
112
"""Save the cursor position, character attribute, character set, and origin mode selection.
116
"""Restore the previously saved cursor position, character attribute, character set, and origin mode selection.
118
If no cursor state was previously saved, move the cursor to the home position.
122
"""Set the given modes on the terminal.
125
def resetModes(mode):
126
"""Reset the given modes on the terminal.
130
def setPrivateModes(modes):
132
Set the given DEC private modes on the terminal.
136
def resetPrivateModes(modes):
138
Reset the given DEC private modes on the terminal.
142
def applicationKeypadMode():
143
"""Cause keypad to generate control functions.
145
Cursor key mode selects the type of characters generated by cursor keys.
148
def numericKeypadMode():
149
"""Cause keypad to generate normal characters.
152
def selectCharacterSet(charSet, which):
153
"""Select a character set.
155
charSet should be one of CS_US, CS_UK, CS_DRAWING, CS_ALTERNATE, or
156
CS_ALTERNATE_SPECIAL.
158
which should be one of G0 or G1.
162
"""Activate the G0 character set.
166
"""Activate the G1 character set.
170
"""Shift to the G2 character set for a single character.
174
"""Shift to the G3 character set for a single character.
177
def selectGraphicRendition(*attributes):
178
"""Enabled one or more character attributes.
180
Arguments should be one or more of UNDERLINE, REVERSE_VIDEO, BLINK, or BOLD.
181
NORMAL may also be specified to disable all character attributes.
184
def horizontalTabulationSet():
185
"""Set a tab stop at the current cursor position.
188
def tabulationClear():
189
"""Clear the tab stop at the current cursor position.
192
def tabulationClearAll():
193
"""Clear all tab stops.
196
def doubleHeightLine(top=True):
197
"""Make the current line the top or bottom half of a double-height, double-width line.
199
If top is True, the current line is the top half. Otherwise, it is the bottom half.
202
def singleWidthLine():
203
"""Make the current line a single-width, single-height line.
206
def doubleWidthLine():
207
"""Make the current line a double-width line.
210
def eraseToLineEnd():
211
"""Erase from the cursor to the end of line, including cursor position.
214
def eraseToLineBeginning():
215
"""Erase from the cursor to the beginning of the line, including the cursor position.
219
"""Erase the entire cursor line.
222
def eraseToDisplayEnd():
223
"""Erase from the cursor to the end of the display, including the cursor position.
226
def eraseToDisplayBeginning():
227
"""Erase from the cursor to the beginning of the display, including the cursor position.
231
"""Erase the entire display.
234
def deleteCharacter(n=1):
235
"""Delete n characters starting at the cursor position.
237
Characters to the right of deleted characters are shifted to the left.
241
"""Insert n lines at the cursor position.
243
Lines below the cursor are shifted down. Lines moved past the bottom margin are lost.
244
This command is ignored when the cursor is outside the scroll region.
248
"""Delete n lines starting at the cursor position.
250
Lines below the cursor are shifted up. This command is ignored when the cursor is outside
254
def reportCursorPosition():
255
"""Return a Deferred that fires with a two-tuple of (x, y) indicating the cursor position.
259
"""Reset the terminal to its initial state.
262
def unhandledControlSequence(seq):
263
"""Called when an unsupported control sequence is received.
266
@param seq: The whole control sequence which could not be interpreted.
274
"""ECMA 48 standardized modes
277
# BREAKS YOPUR KEYBOARD MOFO
278
KEYBOARD_ACTION = KAM = 2
280
# When set, enables character insertion. New display characters
281
# move old display characters to the right. Characters moved past
282
# the right margin are lost.
284
# When reset, enables replacement mode (disables character
285
# insertion). New display characters replace old display
286
# characters at cursor position. The old character is erased.
287
INSERTION_REPLACEMENT = IRM = 4
289
# Set causes a received linefeed, form feed, or vertical tab to
290
# move cursor to first column of next line. RETURN transmits both
291
# a carriage return and linefeed. This selection is also called
294
# Reset causes a received linefeed, form feed, or vertical tab to
295
# move cursor to next line in current column. RETURN transmits a
297
LINEFEED_NEWLINE = LNM = 20
301
"""ANSI-Compatible Private Modes
312
PRINTER_FORM_FEED = 18
315
# Toggle cursor visibility (reset hides it)
322
CS_DRAWING = 'CS_DRAWING'
323
CS_ALTERNATE = 'CS_ALTERNATE'
324
CS_ALTERNATE_SPECIAL = 'CS_ALTERNATE_SPECIAL'
326
# Groupings (or something?? These are like variables that can be bound to character sets)
330
# G2 and G3 cannot be changed, but they can be shifted to.
334
# Character attributes
343
def __init__(self, x, y):
348
file('log', 'a').write(str(s) + '\n')
350
# XXX TODO - These attributes are really part of the
351
# ITerminalTransport interface, I think.
352
_KEY_NAMES = ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
353
'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', 'NUMPAD_MIDDLE',
354
'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
357
'ALT', 'SHIFT', 'CONTROL')
359
class _const(object):
361
@ivar name: A string naming this constant
363
def __init__(self, name):
367
return '[' + self.name + ']'
371
_const(_name) for _name in _KEY_NAMES]
373
class ServerProtocol(protocol.Protocol):
374
implements(ITerminalTransport)
376
protocolFactory = None
377
terminalProtocol = None
387
termSize = Vector(80, 24)
388
cursorPos = Vector(0, 0)
391
# Factory who instantiated me
394
def __init__(self, protocolFactory=None, *a, **kw):
396
@param protocolFactory: A callable which will be invoked with
397
*a, **kw and should return an ITerminalProtocol implementor.
398
This will be invoked when a connection to this ServerProtocol
401
@param a: Any positional arguments to pass to protocolFactory.
402
@param kw: Any keyword arguments to pass to protocolFactory.
404
# assert protocolFactory is None or ITerminalProtocol.implementedBy(protocolFactory), "ServerProtocol.__init__ must be passed an ITerminalProtocol implementor"
405
if protocolFactory is not None:
406
self.protocolFactory = protocolFactory
407
self.protocolArgs = a
408
self.protocolKwArgs = kw
410
self._cursorReports = []
412
def connectionMade(self):
413
if self.protocolFactory is not None:
414
self.terminalProtocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
417
factory = self.factory
418
except AttributeError:
421
self.terminalProtocol.factory = factory
423
self.terminalProtocol.makeConnection(self)
425
def dataReceived(self, data):
427
if self.state == 'data':
429
self.state = 'escaped'
431
self.terminalProtocol.keystrokeReceived(ch, None)
432
elif self.state == 'escaped':
434
self.state = 'bracket-escaped'
437
self.state = 'low-function-escaped'
440
self._handleShortControlSequence(ch)
441
elif self.state == 'bracket-escaped':
443
self.state = 'low-function-escaped'
444
elif ch.isalpha() or ch == '~':
445
self._handleControlSequence(''.join(self.escBuf) + ch)
449
self.escBuf.append(ch)
450
elif self.state == 'low-function-escaped':
451
self._handleLowFunctionControlSequence(ch)
454
raise ValueError("Illegal state")
456
def _handleShortControlSequence(self, ch):
457
self.terminalProtocol.keystrokeReceived(ch, self.ALT)
459
def _handleControlSequence(self, buf):
461
f = getattr(self.controlSequenceParser, CST.get(buf[-1], buf[-1]), None)
463
self.unhandledControlSequence(buf)
465
f(self, self.terminalProtocol, buf[:-1])
467
def unhandledControlSequence(self, buf):
468
self.terminalProtocol.unhandledControlSequence(buf)
470
def _handleLowFunctionControlSequence(self, ch):
471
map = {'P': self.F1, 'Q': self.F2, 'R': self.F3, 'S': self.F4}
473
if keyID is not None:
474
self.terminalProtocol.keystrokeReceived(keyID, None)
476
self.terminalProtocol.unhandledControlSequence('\x1b[O' + ch)
478
class ControlSequenceParser:
479
def A(self, proto, handler, buf):
481
handler.keystrokeReceived(proto.UP_ARROW, None)
483
handler.unhandledControlSequence(buf + 'A')
485
def B(self, proto, handler, buf):
487
handler.keystrokeReceived(proto.DOWN_ARROW, None)
489
handler.unhandledControlSequence(buf + 'B')
491
def C(self, proto, handler, buf):
493
handler.keystrokeReceived(proto.RIGHT_ARROW, None)
495
handler.unhandledControlSequence(buf + 'C')
497
def D(self, proto, handler, buf):
499
handler.keystrokeReceived(proto.LEFT_ARROW, None)
501
handler.unhandledControlSequence(buf + 'D')
503
def E(self, proto, handler, buf):
505
handler.keystrokeReceived(proto.NUMPAD_MIDDLE, None)
507
handler.unhandledControlSequence(buf + 'E')
509
def F(self, proto, handler, buf):
511
handler.keystrokeReceived(proto.END, None)
513
handler.unhandledControlSequence(buf + 'F')
515
def H(self, proto, handler, buf):
517
handler.keystrokeReceived(proto.HOME, None)
519
handler.unhandledControlSequence(buf + 'H')
521
def R(self, proto, handler, buf):
522
if not proto._cursorReports:
523
handler.unhandledControlSequence(buf + 'R')
524
elif buf.startswith('\x1b['):
526
parts = report.split(';')
528
handler.unhandledControlSequence(buf + 'R')
532
Pl, Pc = int(Pl), int(Pc)
534
handler.unhandledControlSequence(buf + 'R')
536
d = proto._cursorReports.pop(0)
537
d.callback((Pc - 1, Pl - 1))
539
handler.unhandledControlSequence(buf + 'R')
541
def Z(self, proto, handler, buf):
543
handler.keystrokeReceived(proto.TAB, proto.SHIFT)
545
handler.unhandledControlSequence(buf + 'Z')
547
def tilde(self, proto, handler, buf):
548
map = {1: proto.HOME, 2: proto.INSERT, 3: proto.DELETE,
549
4: proto.END, 5: proto.PGUP, 6: proto.PGDN,
551
15: proto.F5, 17: proto.F6, 18: proto.F7,
552
19: proto.F8, 20: proto.F9, 21: proto.F10,
553
23: proto.F11, 24: proto.F12}
555
if buf.startswith('\x1b['):
560
handler.unhandledControlSequence(buf + '~')
562
symbolic = map.get(v)
563
if symbolic is not None:
564
handler.keystrokeReceived(map[v], None)
566
handler.unhandledControlSequence(buf + '~')
568
handler.unhandledControlSequence(buf + '~')
570
controlSequenceParser = ControlSequenceParser()
573
def cursorUp(self, n=1):
575
self.cursorPos.y = max(self.cursorPos.y - n, 0)
576
self.write('\x1b[%dA' % (n,))
578
def cursorDown(self, n=1):
580
self.cursorPos.y = min(self.cursorPos.y + n, self.termSize.y - 1)
581
self.write('\x1b[%dB' % (n,))
583
def cursorForward(self, n=1):
585
self.cursorPos.x = min(self.cursorPos.x + n, self.termSize.x - 1)
586
self.write('\x1b[%dC' % (n,))
588
def cursorBackward(self, n=1):
590
self.cursorPos.x = max(self.cursorPos.x - n, 0)
591
self.write('\x1b[%dD' % (n,))
593
def cursorPosition(self, column, line):
594
self.write('\x1b[%d;%dH' % (line + 1, column + 1))
596
def cursorHome(self):
597
self.cursorPos.x = self.cursorPos.y = 0
601
self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
604
def reverseIndex(self):
605
self.cursorPos.y = max(self.cursorPos.y - 1, 0)
610
self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
613
def saveCursor(self):
614
self._savedCursorPos = Vector(self.cursorPos.x, self.cursorPos.y)
617
def restoreCursor(self):
618
self.cursorPos = self._savedCursorPos
619
del self._savedCursorPos
622
def setModes(self, modes):
623
# XXX Support ANSI-Compatible private modes
624
self.write('\x1b[%sh' % (';'.join(map(str, modes)),))
626
def setPrivateModes(self, modes):
627
self.write('\x1b[?%sh' % (';'.join(map(str, modes)),))
629
def resetModes(self, modes):
630
# XXX Support ANSI-Compatible private modes
631
self.write('\x1b[%sl' % (';'.join(map(str, modes)),))
633
def resetPrivateModes(self, modes):
634
self.write('\x1b[?%sl' % (';'.join(map(str, modes)),))
636
def applicationKeypadMode(self):
639
def numericKeypadMode(self):
642
def selectCharacterSet(self, charSet, which):
643
# XXX Rewrite these as dict lookups
649
raise ValueError("`which' argument to selectCharacterSet must be G0 or G1")
652
elif charSet == CS_US:
654
elif charSet == CS_DRAWING:
656
elif charSet == CS_ALTERNATE:
658
elif charSet == CS_ALTERNATE_SPECIAL:
661
raise ValueError("Invalid `charSet' argument to selectCharacterSet")
662
self.write('\x1b' + which + charSet)
670
def singleShift2(self):
673
def singleShift3(self):
676
def selectGraphicRendition(self, *attributes):
680
self.write('\x1b[%sm' % (';'.join(attrs),))
682
def horizontalTabulationSet(self):
685
def tabulationClear(self):
688
def tabulationClearAll(self):
689
self.write('\x1b[3q')
691
def doubleHeightLine(self, top=True):
697
def singleWidthLine(self):
700
def doubleWidthLine(self):
703
def eraseToLineEnd(self):
706
def eraseToLineBeginning(self):
707
self.write('\x1b[1K')
710
self.write('\x1b[2K')
712
def eraseToDisplayEnd(self):
715
def eraseToDisplayBeginning(self):
716
self.write('\x1b[1J')
718
def eraseDisplay(self):
719
self.write('\x1b[2J')
721
def deleteCharacter(self, n=1):
722
self.write('\x1b[%dP' % (n,))
724
def insertLine(self, n=1):
725
self.write('\x1b[%dL' % (n,))
727
def deleteLine(self, n=1):
728
self.write('\x1b[%dM' % (n,))
730
def setScrollRegion(self, first=None, last=None):
731
if first is not None:
732
first = '%d' % (first,)
736
last = '%d' % (last,)
739
self.write('\x1b[%s;%sr' % (first, last))
741
def resetScrollRegion(self):
742
self.setScrollRegion()
744
def reportCursorPosition(self):
746
self._cursorReports.append(d)
747
self.write('\x1b[6n')
751
self.cursorPos.x = self.cursorPos.y = 0
753
del self._savedCursorPos
754
except AttributeError:
759
def write(self, bytes):
761
self.lastWrite = bytes
762
self.transport.write('\r\n'.join(bytes.split('\n')))
764
def writeSequence(self, bytes):
765
self.write(''.join(bytes))
767
def loseConnection(self):
769
self.transport.loseConnection()
771
def connectionLost(self, reason):
772
if self.terminalProtocol is not None:
774
self.terminalProtocol.connectionLost(reason)
776
self.terminalProtocol = None
777
# Add symbolic names for function keys
778
for name, const in zip(_KEY_NAMES, FUNCTION_KEYS):
779
setattr(ServerProtocol, name, const)
783
class ClientProtocol(protocol.Protocol):
785
terminalFactory = None
797
'8': 'restoreCursor',
798
'=': 'applicationKeypadMode',
799
'>': 'numericKeypadMode',
802
'H': 'horizontalTabulationSet',
806
'[': 'bracket-escape',
809
'#': 'select-height-width'}
816
'2': CS_ALTERNATE_SPECIAL}
818
# Factory who instantiated me
821
def __init__(self, terminalFactory=None, *a, **kw):
823
@param terminalFactory: A callable which will be invoked with
824
*a, **kw and should return an ITerminalTransport provider.
825
This will be invoked when this ClientProtocol establishes a
828
@param a: Any positional arguments to pass to terminalFactory.
829
@param kw: Any keyword arguments to pass to terminalFactory.
831
# assert terminalFactory is None or ITerminalTransport.implementedBy(terminalFactory), "ClientProtocol.__init__ must be passed an ITerminalTransport implementor"
832
if terminalFactory is not None:
833
self.terminalFactory = terminalFactory
834
self.terminalArgs = a
835
self.terminalKwArgs = kw
837
def connectionMade(self):
838
if self.terminalFactory is not None:
839
self.terminal = self.terminalFactory(*self.terminalArgs, **self.terminalKwArgs)
840
self.terminal.factory = self.factory
841
self.terminal.makeConnection(self)
843
def connectionLost(self, reason):
844
if self.terminal is not None:
846
self.terminal.connectionLost(reason)
850
def dataReceived(self, bytes):
852
Parse the given data from a terminal server, dispatching to event
853
handlers defined by C{self.terminal}.
857
if self.state == 'data':
860
self.terminal.write(''.join(toWrite))
862
self.state = 'escaped'
865
self.terminal.write(''.join(toWrite))
867
self.terminal.shiftOut()
870
self.terminal.write(''.join(toWrite))
872
self.terminal.shiftIn()
875
self.terminal.write(''.join(toWrite))
877
self.terminal.cursorBackward()
880
elif self.state == 'escaped':
881
fName = self._shorts.get(b)
882
if fName is not None:
884
getattr(self.terminal, fName)()
886
state = self._longs.get(b)
887
if state is not None:
890
self.terminal.unhandledControlSequence('\x1b' + b)
892
elif self.state == 'bracket-escape':
893
if self._escBuf is None:
895
if b.isalpha() or b == '~':
896
self._handleControlSequence(''.join(self._escBuf), b)
900
self._escBuf.append(b)
901
elif self.state == 'select-g0':
902
self.terminal.selectCharacterSet(self._charsets.get(b, b), G0)
904
elif self.state == 'select-g1':
905
self.terminal.selectCharacterSet(self._charsets.get(b, b), G1)
907
elif self.state == 'select-height-width':
908
self._handleHeightWidth(b)
911
raise ValueError("Illegal state")
913
self.terminal.write(''.join(toWrite))
916
def _handleControlSequence(self, buf, terminal):
917
f = getattr(self.controlSequenceParser, CST.get(terminal, terminal), None)
919
self.terminal.unhandledControlSequence('\x1b[' + buf + terminal)
921
f(self, self.terminal, buf)
923
class ControlSequenceParser:
924
def _makeSimple(ch, fName):
926
def simple(self, proto, handler, buf):
928
getattr(handler, n)(1)
933
handler.unhandledControlSequence('\x1b[' + buf + ch)
935
getattr(handler, n)(m)
937
for (ch, fName) in (('A', 'Up'),
941
exec ch + " = _makeSimple(ch, fName)"
944
def h(self, proto, handler, buf):
945
# XXX - Handle '?' to introduce ANSI-Compatible private modes.
947
modes = map(int, buf.split(';'))
949
handler.unhandledControlSequence('\x1b[' + buf + 'h')
951
handler.setModes(modes)
953
def l(self, proto, handler, buf):
954
# XXX - Handle '?' to introduce ANSI-Compatible private modes.
956
modes = map(int, buf.split(';'))
958
handler.unhandledControlSequence('\x1b[' + buf + 'l')
960
handler.resetModes(modes)
962
def r(self, proto, handler, buf):
963
parts = buf.split(';')
965
handler.setScrollRegion(None, None)
966
elif len(parts) == 2:
977
handler.unhandledControlSequence('\x1b[' + buf + 'r')
979
handler.setScrollRegion(pt, pb)
981
handler.unhandledControlSequence('\x1b[' + buf + 'r')
983
def K(self, proto, handler, buf):
985
handler.eraseToLineEnd()
987
handler.eraseToLineBeginning()
991
handler.unhandledControlSequence('\x1b[' + buf + 'K')
993
def H(self, proto, handler, buf):
996
def J(self, proto, handler, buf):
998
handler.eraseToDisplayEnd()
1000
handler.eraseToDisplayBeginning()
1002
handler.eraseDisplay()
1004
handler.unhandledControlSequence('\x1b[' + buf + 'J')
1006
def P(self, proto, handler, buf):
1008
handler.deleteCharacter(1)
1013
handler.unhandledControlSequence('\x1b[' + buf + 'P')
1015
handler.deleteCharacter(n)
1017
def L(self, proto, handler, buf):
1019
handler.insertLine(1)
1024
handler.unhandledControlSequence('\x1b[' + buf + 'L')
1026
handler.insertLine(n)
1028
def M(self, proto, handler, buf):
1030
handler.deleteLine(1)
1035
handler.unhandledControlSequence('\x1b[' + buf + 'M')
1037
handler.deleteLine(n)
1039
def n(self, proto, handler, buf):
1041
x, y = handler.reportCursorPosition()
1042
proto.transport.write('\x1b[%d;%dR' % (x + 1, y + 1))
1044
handler.unhandledControlSequence('\x1b[' + buf + 'n')
1046
def m(self, proto, handler, buf):
1048
handler.selectGraphicRendition(NORMAL)
1051
for a in buf.split(';'):
1057
handler.selectGraphicRendition(*attrs)
1059
controlSequenceParser = ControlSequenceParser()
1061
def _handleHeightWidth(self, b):
1063
self.terminal.doubleHeightLine(True)
1065
self.terminal.doubleHeightLine(False)
1067
self.terminal.singleWidthLine()
1069
self.terminal.doubleWidthLine()
1071
self.terminal.unhandledControlSequence('\x1b#' + b)
1076
'ITerminalProtocol', 'ITerminalTransport',
1078
# Symbolic constants
1079
'modes', 'privateModes', 'FUNCTION_KEYS',
1081
'CS_US', 'CS_UK', 'CS_DRAWING', 'CS_ALTERNATE', 'CS_ALTERNATE_SPECIAL',
1082
'G0', 'G1', 'G2', 'G3',
1084
'UNDERLINE', 'REVERSE_VIDEO', 'BLINK', 'BOLD', 'NORMAL',
1087
'ServerProtocol', 'ClientProtocol']