1
# -*- test-case-name: twisted.conch.test.test_insults -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
5
"""VT102 terminal manipulation
7
API Stability: Unstable
9
@author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
14
from zope.interface import implements, Interface
16
from twisted.internet import protocol, defer, interfaces as iinternet
18
class ITerminalProtocol(Interface):
19
def makeConnection(transport):
20
"""Called with an L{ITerminalTransport} when a connection is established.
23
def keystrokeReceived(keyID, modifier):
24
"""A keystroke was received.
26
Each keystroke corresponds to one invocation of this method.
27
keyID is a string identifier for that key. Printable characters
28
are represented by themselves. Control keys, such as arrows and
29
function keys, are represented with symbolic constants on
33
def terminalSize(width, height):
34
"""Called to indicate the size of the terminal.
36
A terminal of 80x24 should be assumed if this method is not
37
called. This method might not be called for real terminals.
40
def unhandledControlSequence(seq):
41
"""Called when an unsupported control sequence is received.
44
@param seq: The whole control sequence which could not be interpreted.
47
def connectionLost(reason):
48
"""Called when the connection has been lost.
50
reason is a Failure describing why.
53
class TerminalProtocol(object):
54
implements(ITerminalProtocol)
56
def makeConnection(self, terminal):
57
# assert ITerminalTransport.providedBy(transport), "TerminalProtocol.makeConnection must be passed an ITerminalTransport implementor"
58
self.terminal = terminal
61
def connectionMade(self):
62
"""Called after a connection has been established.
65
def keystrokeReceived(self, keyID, modifier):
68
def terminalSize(self, width, height):
71
def unhandledControlSequence(self, seq):
74
def connectionLost(self, reason):
77
class ITerminalTransport(iinternet.ITransport):
79
"""Move the cursor up n lines.
83
"""Move the cursor down n lines.
86
def cursorForward(n=1):
87
"""Move the cursor right n columns.
90
def cursorBackward(n=1):
91
"""Move the cursor left n columns.
94
def cursorPosition(column, line):
95
"""Move the cursor to the given line and column.
99
"""Move the cursor home.
103
"""Move the cursor down one line, performing scrolling if necessary.
107
"""Move the cursor up one line, performing scrolling if necessary.
111
"""Move the cursor to the first position on the next line, performing scrolling if necessary.
115
"""Save the cursor position, character attribute, character set, and origin mode selection.
119
"""Restore the previously saved cursor position, character attribute, character set, and origin mode selection.
121
If no cursor state was previously saved, move the cursor to the home position.
125
"""Set the given modes on the terminal.
128
def resetModes(mode):
129
"""Reset the given modes on the terminal.
132
def applicationKeypadMode():
133
"""Cause keypad to generate control functions.
135
Cursor key mode selects the type of characters generated by cursor keys.
138
def numericKeypadMode():
139
"""Cause keypad to generate normal characters.
142
def selectCharacterSet(charSet, which):
143
"""Select a character set.
145
charSet should be one of CS_US, CS_UK, CS_DRAWING, CS_ALTERNATE, or
146
CS_ALTERNATE_SPECIAL.
148
which should be one of G0 or G1.
152
"""Activate the G0 character set.
156
"""Activate the G1 character set.
160
"""Shift to the G2 character set for a single character.
164
"""Shift to the G3 character set for a single character.
167
def selectGraphicRendition(*attributes):
168
"""Enabled one or more character attributes.
170
Arguments should be one or more of UNDERLINE, REVERSE_VIDEO, BLINK, or BOLD.
171
NORMAL may also be specified to disable all character attributes.
174
def horizontalTabulationSet():
175
"""Set a tab stop at the current cursor position.
178
def tabulationClear():
179
"""Clear the tab stop at the current cursor position.
182
def tabulationClearAll():
183
"""Clear all tab stops.
186
def doubleHeightLine(top=True):
187
"""Make the current line the top or bottom half of a double-height, double-width line.
189
If top is True, the current line is the top half. Otherwise, it is the bottom half.
192
def singleWidthLine():
193
"""Make the current line a single-width, single-height line.
196
def doubleWidthLine():
197
"""Make the current line a double-width line.
200
def eraseToLineEnd():
201
"""Erase from the cursor to the end of line, including cursor position.
204
def eraseToLineBeginning():
205
"""Erase from the cursor to the beginning of the line, including the cursor position.
209
"""Erase the entire cursor line.
212
def eraseToDisplayEnd():
213
"""Erase from the cursor to the end of the display, including the cursor position.
216
def eraseToDisplayBeginning():
217
"""Erase from the cursor to the beginning of the display, including the cursor position.
221
"""Erase the entire display.
224
def deleteCharacter(n=1):
225
"""Delete n characters starting at the cursor position.
227
Characters to the right of deleted characters are shifted to the left.
231
"""Insert n lines at the cursor position.
233
Lines below the cursor are shifted down. Lines moved past the bottom margin are lost.
234
This command is ignored when the cursor is outside the scroll region.
238
"""Delete n lines starting at the cursor position.
240
Lines below the cursor are shifted up. This command is ignored when the cursor is outside
244
def reportCursorPosition():
245
"""Return a Deferred that fires with a two-tuple of (x, y) indicating the cursor position.
249
"""Reset the terminal to its initial state.
252
def unhandledControlSequence(seq):
253
"""Called when an unsupported control sequence is received.
256
@param seq: The whole control sequence which could not be interpreted.
264
"""ECMA 48 standardized modes
267
# BREAKS YOPUR KEYBOARD MOFO
268
KEYBOARD_ACTION = KAM = 2
270
# When set, enables character insertion. New display characters
271
# move old display characters to the right. Characters moved past
272
# the right margin are lost.
274
# When reset, enables replacement mode (disables character
275
# insertion). New display characters replace old display
276
# characters at cursor position. The old character is erased.
277
INSERTION_REPLACEMENT = IRM = 4
279
# Set causes a received linefeed, form feed, or vertical tab to
280
# move cursor to first column of next line. RETURN transmits both
281
# a carriage return and linefeed. This selection is also called
284
# Reset causes a received linefeed, form feed, or vertical tab to
285
# move cursor to next line in current column. RETURN transmits a
287
LINEFEED_NEWLINE = LNM = 20
291
"""ANSI-Compatible Private Modes
302
PRINTER_FORM_FEED = 18
305
# Toggle cursor visibility (reset hides it)
312
CS_DRAWING = 'CS_DRAWING'
313
CS_ALTERNATE = 'CS_ALTERNATE'
314
CS_ALTERNATE_SPECIAL = 'CS_ALTERNATE_SPECIAL'
316
# Groupings (or something?? These are like variables that can be bound to character sets)
320
# G2 and G3 cannot be changed, but they can be shifted to.
324
# Character attributes
333
def __init__(self, x, y):
338
file('log', 'a').write(str(s) + '\n')
340
# XXX TODO - These attributes are really part of the
341
# ITerminalTransport interface, I think.
342
_KEY_NAMES = ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
343
'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', 'NUMPAD_MIDDLE',
344
'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
347
'ALT', 'SHIFT', 'CONTROL')
349
class _const(object):
351
@ivar name: A string naming this constant
353
def __init__(self, name):
357
return '[' + self.name + ']'
361
_const(_name) for _name in _KEY_NAMES]
363
class ServerProtocol(protocol.Protocol):
364
implements(ITerminalTransport)
366
protocolFactory = None
367
terminalProtocol = None
377
termSize = Vector(80, 24)
378
cursorPos = Vector(0, 0)
381
# Factory who instantiated me
384
def __init__(self, protocolFactory=None, *a, **kw):
386
@param protocolFactory: A callable which will be invoked with
387
*a, **kw and should return an ITerminalProtocol implementor.
388
This will be invoked when a connection to this ServerProtocol
391
@param a: Any positional arguments to pass to protocolFactory.
392
@param kw: Any keyword arguments to pass to protocolFactory.
394
# assert protocolFactory is None or ITerminalProtocol.implementedBy(protocolFactory), "ServerProtocol.__init__ must be passed an ITerminalProtocol implementor"
395
if protocolFactory is not None:
396
self.protocolFactory = protocolFactory
397
self.protocolArgs = a
398
self.protocolKwArgs = kw
400
self._cursorReports = []
402
def connectionMade(self):
403
if self.protocolFactory is not None:
404
self.terminalProtocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
407
factory = self.factory
408
except AttributeError:
411
self.terminalProtocol.factory = factory
413
self.terminalProtocol.makeConnection(self)
415
def dataReceived(self, data):
417
if self.state == 'data':
419
self.state = 'escaped'
421
self.terminalProtocol.keystrokeReceived(ch, None)
422
elif self.state == 'escaped':
424
self.state = 'bracket-escaped'
427
self.state = 'low-function-escaped'
430
self._handleShortControlSequence(ch)
431
elif self.state == 'bracket-escaped':
433
self.state = 'low-function-escaped'
434
elif ch.isalpha() or ch == '~':
435
self._handleControlSequence(''.join(self.escBuf) + ch)
439
self.escBuf.append(ch)
440
elif self.state == 'low-function-escaped':
441
self._handleLowFunctionControlSequence(ch)
444
raise ValueError("Illegal state")
446
def _handleShortControlSequence(self, ch):
447
self.terminalProtocol.keystrokeReceived(ch, self.ALT)
449
def _handleControlSequence(self, buf):
451
f = getattr(self.controlSequenceParser, CST.get(buf[-1], buf[-1]), None)
453
self.unhandledControlSequence(buf)
455
f(self, self.terminalProtocol, buf[:-1])
457
def unhandledControlSequence(self, buf):
458
self.terminalProtocol.unhandledControlSequence(buf)
460
def _handleLowFunctionControlSequence(self, ch):
461
map = {'P': self.F1, 'Q': self.F2, 'R': self.F3, 'S': self.F4}
463
if keyID is not None:
464
self.terminalProtocol.keystrokeReceived(keyID, None)
466
self.terminalProtocol.unhandledControlSequence('\x1b[O' + ch)
468
class ControlSequenceParser:
469
def A(self, proto, handler, buf):
471
handler.keystrokeReceived(proto.UP_ARROW, None)
473
handler.unhandledControlSequence(buf + 'A')
475
def B(self, proto, handler, buf):
477
handler.keystrokeReceived(proto.DOWN_ARROW, None)
479
handler.unhandledControlSequence(buf + 'B')
481
def C(self, proto, handler, buf):
483
handler.keystrokeReceived(proto.RIGHT_ARROW, None)
485
handler.unhandledControlSequence(buf + 'C')
487
def D(self, proto, handler, buf):
489
handler.keystrokeReceived(proto.LEFT_ARROW, None)
491
handler.unhandledControlSequence(buf + 'D')
493
def E(self, proto, handler, buf):
495
handler.keystrokeReceived(proto.NUMPAD_MIDDLE, None)
497
handler.unhandledControlSequence(buf + 'E')
499
def F(self, proto, handler, buf):
501
handler.keystrokeReceived(proto.END, None)
503
handler.unhandledControlSequence(buf + 'F')
505
def H(self, proto, handler, buf):
507
handler.keystrokeReceived(proto.HOME, None)
509
handler.unhandledControlSequence(buf + 'H')
511
def R(self, proto, handler, buf):
512
if not proto._cursorReports:
513
handler.unhandledControlSequence(buf + 'R')
514
elif buf.startswith('\x1b['):
516
parts = report.split(';')
518
handler.unhandledControlSequence(buf + 'R')
522
Pl, Pc = int(Pl), int(Pc)
524
handler.unhandledControlSequence(buf + 'R')
526
d = proto._cursorReports.pop(0)
527
d.callback((Pc - 1, Pl - 1))
529
handler.unhandledControlSequence(buf + 'R')
531
def Z(self, proto, handler, buf):
533
handler.keystrokeReceived(proto.TAB, proto.SHIFT)
535
handler.unhandledControlSequence(buf + 'Z')
537
def tilde(self, proto, handler, buf):
538
map = {1: proto.HOME, 2: proto.INSERT, 3: proto.DELETE,
539
4: proto.END, 5: proto.PGUP, 6: proto.PGDN,
541
15: proto.F5, 17: proto.F6, 18: proto.F7,
542
19: proto.F8, 20: proto.F9, 21: proto.F10,
543
23: proto.F11, 24: proto.F12}
545
if buf.startswith('\x1b['):
550
handler.unhandledControlSequence(buf + '~')
552
symbolic = map.get(v)
553
if symbolic is not None:
554
handler.keystrokeReceived(map[v], None)
556
handler.unhandledControlSequence(buf + '~')
558
handler.unhandledControlSequence(buf + '~')
560
controlSequenceParser = ControlSequenceParser()
563
def cursorUp(self, n=1):
565
self.cursorPos.y = max(self.cursorPos.y - n, 0)
566
self.write('\x1b[%dA' % (n,))
568
def cursorDown(self, n=1):
570
self.cursorPos.y = min(self.cursorPos.y + n, self.termSize.y - 1)
571
self.write('\x1b[%dB' % (n,))
573
def cursorForward(self, n=1):
575
self.cursorPos.x = min(self.cursorPos.x + n, self.termSize.x - 1)
576
self.write('\x1b[%dC' % (n,))
578
def cursorBackward(self, n=1):
580
self.cursorPos.x = max(self.cursorPos.x - n, 0)
581
self.write('\x1b[%dD' % (n,))
583
def cursorPosition(self, column, line):
584
self.write('\x1b[%d;%dH' % (line + 1, column + 1))
586
def cursorHome(self):
587
self.cursorPos.x = self.cursorPos.y = 0
591
self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
594
def reverseIndex(self):
595
self.cursorPos.y = max(self.cursorPos.y - 1, 0)
600
self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
603
def saveCursor(self):
604
self._savedCursorPos = Vector(self.cursorPos.x, self.cursorPos.y)
607
def restoreCursor(self):
608
self.cursorPos = self._savedCursorPos
609
del self._savedCursorPos
612
def setModes(self, modes):
613
# XXX Support ANSI-Compatible private modes
614
self.write('\x1b[%sh' % (';'.join(map(str, modes)),))
616
def setPrivateModes(self, modes):
617
self.write('\x1b[?%sh' % (';'.join(map(str, modes)),))
619
def resetModes(self, modes):
620
# XXX Support ANSI-Compatible private modes
621
self.write('\x1b[%sl' % (';'.join(map(str, modes)),))
623
def resetPrivateModes(self, modes):
624
self.write('\x1b[?%sl' % (';'.join(map(str, modes)),))
626
def applicationKeypadMode(self):
629
def numericKeypadMode(self):
632
def selectCharacterSet(self, charSet, which):
633
# XXX Rewrite these as dict lookups
639
raise ValueError("`which' argument to selectCharacterSet must be G0 or G1")
642
elif charSet == CS_US:
644
elif charSet == CS_DRAWING:
646
elif charSet == CS_ALTERNATE:
648
elif charSet == CS_ALTERNATE_SPECIAL:
651
raise ValueError("Invalid `charSet' argument to selectCharacterSet")
652
self.write('\x1b' + which + charSet)
660
def singleShift2(self):
663
def singleShift3(self):
666
def selectGraphicRendition(self, *attributes):
670
self.write('\x1b[%sm' % (';'.join(attrs),))
672
def horizontalTabulationSet(self):
675
def tabulationClear(self):
678
def tabulationClearAll(self):
679
self.write('\x1b[3q')
681
def doubleHeightLine(self, top=True):
687
def singleWidthLine(self):
690
def doubleWidthLine(self):
693
def eraseToLineEnd(self):
696
def eraseToLineBeginning(self):
697
self.write('\x1b[1K')
700
self.write('\x1b[2K')
702
def eraseToDisplayEnd(self):
705
def eraseToDisplayBeginning(self):
706
self.write('\x1b[1J')
708
def eraseDisplay(self):
709
self.write('\x1b[2J')
711
def deleteCharacter(self, n=1):
712
self.write('\x1b[%dP' % (n,))
714
def insertLine(self, n=1):
715
self.write('\x1b[%dL' % (n,))
717
def deleteLine(self, n=1):
718
self.write('\x1b[%dM' % (n,))
720
def setScrollRegion(self, first=None, last=None):
721
if first is not None:
722
first = '%d' % (first,)
726
last = '%d' % (last,)
729
self.write('\x1b[%s;%sr' % (first, last))
731
def resetScrollRegion(self):
732
self.setScrollRegion()
734
def reportCursorPosition(self):
736
self._cursorReports.append(d)
737
self.write('\x1b[6n')
741
self.cursorPos.x = self.cursorPos.y = 0
743
del self._savedCursorPos
744
except AttributeError:
749
def write(self, bytes):
751
self.lastWrite = bytes
752
self.transport.write('\r\n'.join(bytes.split('\n')))
754
def writeSequence(self, bytes):
755
self.write(''.join(bytes))
757
def loseConnection(self):
759
self.transport.loseConnection()
761
def connectionLost(self, reason):
762
if self.terminalProtocol is not None:
764
self.terminalProtocol.connectionLost(reason)
766
self.terminalProtocol = None
767
# Add symbolic names for function keys
768
for name, const in zip(_KEY_NAMES, FUNCTION_KEYS):
769
setattr(ServerProtocol, name, const)
773
class ClientProtocol(protocol.Protocol):
775
terminalFactory = None
787
'8': 'restoreCursor',
788
'=': 'applicationKeypadMode',
789
'>': 'numericKeypadMode',
792
'H': 'horizontalTabulationSet',
796
'[': 'bracket-escape',
799
'#': 'select-height-width'}
806
'2': CS_ALTERNATE_SPECIAL}
808
# Factory who instantiated me
811
def __init__(self, terminalFactory=None, *a, **kw):
813
@param terminalFactory: A callable which will be invoked with
814
*a, **kw and should return an ITerminalTransport provider.
815
This will be invoked when this ClientProtocol establishes a
818
@param a: Any positional arguments to pass to terminalFactory.
819
@param kw: Any keyword arguments to pass to terminalFactory.
821
# assert terminalFactory is None or ITerminalTransport.implementedBy(terminalFactory), "ClientProtocol.__init__ must be passed an ITerminalTransport implementor"
822
if terminalFactory is not None:
823
self.terminalFactory = terminalFactory
824
self.terminalArgs = a
825
self.terminalKwArgs = kw
827
def connectionMade(self):
828
if self.terminalFactory is not None:
829
self.terminal = self.terminalFactory(*self.terminalArgs, **self.terminalKwArgs)
830
self.terminal.factory = self.factory
831
self.terminal.makeConnection(self)
833
def connectionLost(self, reason):
834
if self.terminal is not None:
836
self.terminal.connectionLost(reason)
840
def dataReceived(self, bytes):
842
if self.state == 'data':
844
self.state = 'escaped'
846
self.terminal.shiftOut()
848
self.terminal.shiftIn()
850
self.terminal.cursorBackward()
852
self.terminal.write(b)
853
elif self.state == 'escaped':
854
fName = self._shorts.get(b)
855
if fName is not None:
857
getattr(self.terminal, fName)()
859
state = self._longs.get(b)
860
if state is not None:
863
self.terminal.unhandledControlSequence('\x1b' + b)
865
elif self.state == 'bracket-escape':
866
if self._escBuf is None:
868
if b.isalpha() or b == '~':
869
self._handleControlSequence(''.join(self._escBuf), b)
873
self._escBuf.append(b)
874
elif self.state == 'select-g0':
875
self.terminal.selectCharacterSet(self._charsets.get(b, b), G0)
877
elif self.state == 'select-g1':
878
self.terminal.selectCharacterSet(self._charsets.get(b, b), G1)
880
elif self.state == 'select-height-width':
881
self._handleHeightWidth(b)
884
raise ValueError("Illegal state")
886
def _handleControlSequence(self, buf, terminal):
887
f = getattr(self.controlSequenceParser, CST.get(terminal, terminal), None)
889
self.terminal.unhandledControlSequence('\x1b[' + buf + terminal)
891
f(self, self.terminal, buf)
893
class ControlSequenceParser:
894
def _makeSimple(ch, fName):
896
def simple(self, proto, handler, buf):
898
getattr(handler, n)(1)
903
handler.unhandledControlSequence('\x1b[' + buf + ch)
905
getattr(handler, n)(m)
907
for (ch, fName) in (('A', 'Up'),
911
exec ch + " = _makeSimple(ch, fName)"
914
def h(self, proto, handler, buf):
915
# XXX - Handle '?' to introduce ANSI-Compatible private modes.
917
modes = map(int, buf.split(';'))
919
handler.unhandledControlSequence('\x1b[' + buf + 'h')
921
handler.setModes(modes)
923
def l(self, proto, handler, buf):
924
# XXX - Handle '?' to introduce ANSI-Compatible private modes.
926
modes = map(int, buf.split(';'))
928
handler.unhandledControlSequence('\x1b[' + buf + 'l')
930
handler.resetModes(modes)
932
def r(self, proto, handler, buf):
933
parts = buf.split(';')
935
handler.setScrollRegion(None, None)
936
elif len(parts) == 2:
947
handler.unhandledControlSequence('\x1b[' + buf + 'r')
949
handler.setScrollRegion(pt, pb)
951
handler.unhandledControlSequence('\x1b[' + buf + 'r')
953
def K(self, proto, handler, buf):
955
handler.eraseToLineEnd()
957
handler.eraseToLineBeginning()
961
handler.unhandledControlSequence('\x1b[' + buf + 'K')
963
def H(self, proto, handler, buf):
966
def J(self, proto, handler, buf):
968
handler.eraseToDisplayEnd()
970
handler.eraseToDisplayBeginning()
972
handler.eraseDisplay()
974
handler.unhandledControlSequence('\x1b[' + buf + 'J')
976
def P(self, proto, handler, buf):
978
handler.deleteCharacter(1)
983
handler.unhandledControlSequence('\x1b[' + buf + 'P')
985
handler.deleteCharacter(n)
987
def L(self, proto, handler, buf):
989
handler.insertLine(1)
994
handler.unhandledControlSequence('\x1b[' + buf + 'L')
996
handler.insertLine(n)
998
def M(self, proto, handler, buf):
1000
handler.deleteLine(1)
1005
handler.unhandledControlSequence('\x1b[' + buf + 'M')
1007
handler.deleteLine(n)
1009
def n(self, proto, handler, buf):
1011
x, y = handler.reportCursorPosition()
1012
proto.transport.write('\x1b[%d;%dR' % (x + 1, y + 1))
1014
handler.unhandledControlSequence('\x1b[' + buf + 'n')
1016
def m(self, proto, handler, buf):
1018
handler.selectGraphicRendition(NORMAL)
1021
for a in buf.split(';'):
1027
handler.selectGraphicRendition(*attrs)
1029
controlSequenceParser = ControlSequenceParser()
1031
def _handleHeightWidth(self, b):
1033
self.terminal.doubleHeightLine(True)
1035
self.terminal.doubleHeightLine(False)
1037
self.terminal.singleWidthLine()
1039
self.terminal.doubleWidthLine()
1041
self.terminal.unhandledControlSequence('\x1b#' + b)
1046
'ITerminalProtocol', 'ITerminalTransport',
1048
# Symbolic constants
1049
'modes', 'privateModes', 'FUNCTION_KEYS',
1051
'CS_US', 'CS_UK', 'CS_DRAWING', 'CS_ALTERNATE', 'CS_ALTERNATE_SPECIAL',
1052
'G0', 'G1', 'G2', 'G3',
1054
'UNDERLINE', 'REVERSE_VIDEO', 'BLINK', 'BOLD', 'NORMAL',
1057
'ServerProtocol', 'ClientProtocol']