1
# -*- test-case-name: twisted.conch.test.test_recvline -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Basic line editing support.
13
from zope.interface import implements
15
from twisted.conch.insults import insults, helper
17
from twisted.python import log, reflect
20
class Logging(object):
21
"""Wrapper which logs attribute lookups.
23
This was useful in debugging something, I guess. I forget what.
24
It can probably be deleted or moved somewhere more appropriate.
25
Nothing special going on here, really.
27
def __init__(self, original):
28
self.original = original
29
key = reflect.qual(original.__class__)
30
count = _counters.get(key, 0)
31
_counters[key] = count + 1
32
self._logFile = file(key + '-' + str(count), 'w')
35
return str(super(Logging, self).__getattribute__('original'))
38
return repr(super(Logging, self).__getattribute__('original'))
40
def __getattribute__(self, name):
41
original = super(Logging, self).__getattribute__('original')
42
logFile = super(Logging, self).__getattribute__('_logFile')
43
logFile.write(name + '\n')
44
return getattr(original, name)
46
class TransportSequence(object):
47
"""An L{ITerminalTransport} implementation which forwards calls to
48
one or more other L{ITerminalTransport}s.
50
This is a cheap way for servers to keep track of the state they
51
expect the client to see, since all terminal manipulations can be
52
send to the real client and to a terminal emulator that lives in
55
implements(insults.ITerminalTransport)
57
for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
58
'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN',
59
'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
61
exec '%s = object()' % (keyID,)
66
def __init__(self, *transports):
67
assert transports, "Cannot construct a TransportSequence with no transports"
68
self.transports = transports
70
for method in insults.ITerminalTransport:
72
def %s(self, *a, **kw):
73
for tpt in self.transports:
74
result = tpt.%s(*a, **kw)
76
""" % (method, method)
78
class LocalTerminalBufferMixin(object):
79
"""A mixin for RecvLine subclasses which records the state of the terminal.
81
This is accomplished by performing all L{ITerminalTransport} operations on both
82
the transport passed to makeConnection and an instance of helper.TerminalBuffer.
84
@ivar terminalCopy: A L{helper.TerminalBuffer} instance which efforts
85
will be made to keep up to date with the actual terminal
86
associated with this protocol instance.
89
def makeConnection(self, transport):
90
self.terminalCopy = helper.TerminalBuffer()
91
self.terminalCopy.connectionMade()
92
return super(LocalTerminalBufferMixin, self).makeConnection(
93
TransportSequence(transport, self.terminalCopy))
96
return str(self.terminalCopy)
98
class RecvLine(insults.TerminalProtocol):
99
"""L{TerminalProtocol} which adds line editing features.
101
Clients will be prompted for lines of input with all the usual
102
features: character echoing, left and right arrow support for
103
moving the cursor to different areas of the line buffer, backspace
104
and delete for removing characters, and insert for toggling
105
between typeover and insert mode. Tabs will be expanded to enough
106
spaces to move the cursor to the next tabstop (every four
107
characters by default). Enter causes the line buffer to be
108
cleared and the line to be passed to the lineReceived() method
109
which, by default, does nothing. Subclasses are responsible for
110
redrawing the input prompt (this will probably change).
117
ps = ('>>> ', '... ')
120
def connectionMade(self):
121
# A list containing the characters making up the current line
124
# A zero-based (wtf else?) index into self.lineBuffer.
125
# Indicates the current cursor position.
126
self.lineBufferIndex = 0
129
# A map of keyIDs to bound instance methods.
131
t.LEFT_ARROW: self.handle_LEFT,
132
t.RIGHT_ARROW: self.handle_RIGHT,
133
t.TAB: self.handle_TAB,
135
# Both of these should not be necessary, but figuring out
136
# which is necessary is a huge hassle.
137
'\r': self.handle_RETURN,
138
'\n': self.handle_RETURN,
140
t.BACKSPACE: self.handle_BACKSPACE,
141
t.DELETE: self.handle_DELETE,
142
t.INSERT: self.handle_INSERT,
143
t.HOME: self.handle_HOME,
144
t.END: self.handle_END}
146
self.initializeScreen()
148
def initializeScreen(self):
149
# Hmm, state sucks. Oh well.
150
# For now we will just take over the whole terminal.
151
self.terminal.reset()
152
self.terminal.write(self.ps[self.pn])
153
# XXX Note: I would prefer to default to starting in insert
154
# mode, however this does not seem to actually work! I do not
155
# know why. This is probably of interest to implementors
156
# subclassing RecvLine.
158
# XXX XXX Note: But the unit tests all expect the initial mode
159
# to be insert right now. Fuck, there needs to be a way to
160
# query the current mode or something.
161
# self.setTypeoverMode()
164
def currentLineBuffer(self):
165
s = ''.join(self.lineBuffer)
166
return s[:self.lineBufferIndex], s[self.lineBufferIndex:]
168
def setInsertMode(self):
170
self.terminal.setModes([insults.modes.IRM])
172
def setTypeoverMode(self):
173
self.mode = 'typeover'
174
self.terminal.resetModes([insults.modes.IRM])
176
def drawInputLine(self):
178
Write a line containing the current input prompt and the current line
179
buffer at the current cursor position.
181
self.terminal.write(self.ps[self.pn] + ''.join(self.lineBuffer))
183
def terminalSize(self, width, height):
184
# XXX - Clear the previous input line, redraw it at the new
186
self.terminal.eraseDisplay()
187
self.terminal.cursorHome()
192
def unhandledControlSequence(self, seq):
195
def keystrokeReceived(self, keyID, modifier):
196
m = self.keyHandlers.get(keyID)
199
elif keyID in string.printable:
200
self.characterReceived(keyID, False)
202
log.msg("Received unhandled keyID: %r" % (keyID,))
204
def characterReceived(self, ch, moreCharactersComing):
205
if self.mode == 'insert':
206
self.lineBuffer.insert(self.lineBufferIndex, ch)
208
self.lineBuffer[self.lineBufferIndex:self.lineBufferIndex+1] = [ch]
209
self.lineBufferIndex += 1
210
self.terminal.write(ch)
212
def handle_TAB(self):
213
n = self.TABSTOP - (len(self.lineBuffer) % self.TABSTOP)
214
self.terminal.cursorForward(n)
215
self.lineBufferIndex += n
216
self.lineBuffer.extend(' ' * n)
218
def handle_LEFT(self):
219
if self.lineBufferIndex > 0:
220
self.lineBufferIndex -= 1
221
self.terminal.cursorBackward()
223
def handle_RIGHT(self):
224
if self.lineBufferIndex < len(self.lineBuffer):
225
self.lineBufferIndex += 1
226
self.terminal.cursorForward()
228
def handle_HOME(self):
229
if self.lineBufferIndex:
230
self.terminal.cursorBackward(self.lineBufferIndex)
231
self.lineBufferIndex = 0
233
def handle_END(self):
234
offset = len(self.lineBuffer) - self.lineBufferIndex
236
self.terminal.cursorForward(offset)
237
self.lineBufferIndex = len(self.lineBuffer)
239
def handle_BACKSPACE(self):
240
if self.lineBufferIndex > 0:
241
self.lineBufferIndex -= 1
242
del self.lineBuffer[self.lineBufferIndex]
243
self.terminal.cursorBackward()
244
self.terminal.deleteCharacter()
246
def handle_DELETE(self):
247
if self.lineBufferIndex < len(self.lineBuffer):
248
del self.lineBuffer[self.lineBufferIndex]
249
self.terminal.deleteCharacter()
251
def handle_RETURN(self):
252
line = ''.join(self.lineBuffer)
254
self.lineBufferIndex = 0
255
self.terminal.nextLine()
256
self.lineReceived(line)
258
def handle_INSERT(self):
259
assert self.mode in ('typeover', 'insert')
260
if self.mode == 'typeover':
263
self.setTypeoverMode()
265
def lineReceived(self, line):
268
class HistoricRecvLine(RecvLine):
269
"""L{TerminalProtocol} which adds both basic line-editing features and input history.
271
Everything supported by L{RecvLine} is also supported by this class. In addition, the
272
up and down arrows traverse the input history. Each received line is automatically
273
added to the end of the input history.
275
def connectionMade(self):
276
RecvLine.connectionMade(self)
278
self.historyLines = []
279
self.historyPosition = 0
282
self.keyHandlers.update({t.UP_ARROW: self.handle_UP,
283
t.DOWN_ARROW: self.handle_DOWN})
285
def currentHistoryBuffer(self):
286
b = tuple(self.historyLines)
287
return b[:self.historyPosition], b[self.historyPosition:]
289
def _deliverBuffer(self, buf):
292
self.characterReceived(ch, True)
293
self.characterReceived(buf[-1], False)
296
if self.lineBuffer and self.historyPosition == len(self.historyLines):
297
self.historyLines.append(self.lineBuffer)
298
if self.historyPosition > 0:
300
self.terminal.eraseToLineEnd()
302
self.historyPosition -= 1
305
self._deliverBuffer(self.historyLines[self.historyPosition])
307
def handle_DOWN(self):
308
if self.historyPosition < len(self.historyLines) - 1:
310
self.terminal.eraseToLineEnd()
312
self.historyPosition += 1
315
self._deliverBuffer(self.historyLines[self.historyPosition])
318
self.terminal.eraseToLineEnd()
320
self.historyPosition = len(self.historyLines)
322
self.lineBufferIndex = 0
324
def handle_RETURN(self):
326
self.historyLines.append(''.join(self.lineBuffer))
327
self.historyPosition = len(self.historyLines)
328
return RecvLine.handle_RETURN(self)