1
# -*- test-case-name: twisted.conch.test.test_recvline -*-
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
3
# See LICENSE for details.
5
"""Basic line editing support.
7
API Stability: Unstable
9
@author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
14
from zope.interface import implements
16
from twisted.conch.insults import insults, helper
18
from twisted.python import log, reflect
21
class Logging(object):
22
"""Wrapper which logs attribute lookups.
24
This was useful in debugging something, I guess. I forget what.
25
It can probably be deleted or moved somewhere more appropriate.
26
Nothing special going on here, really.
28
def __init__(self, original):
29
self.original = original
30
key = reflect.qual(original.__class__)
31
count = _counters.get(key, 0)
32
_counters[key] = count + 1
33
self._logFile = file(key + '-' + str(count), 'w')
36
return str(super(Logging, self).__getattribute__('original'))
39
return repr(super(Logging, self).__getattribute__('original'))
41
def __getattribute__(self, name):
42
original = super(Logging, self).__getattribute__('original')
43
logFile = super(Logging, self).__getattribute__('_logFile')
44
logFile.write(name + '\n')
45
return getattr(original, name)
47
class TransportSequence(object):
48
"""An L{ITerminalTransport} implementation which forwards calls to
49
one or more other L{ITerminalTransport}s.
51
This is a cheap way for servers to keep track of the state they
52
expect the client to see, since all terminal manipulations can be
53
send to the real client and to a terminal emulator that lives in
56
implements(insults.ITerminalTransport)
58
for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
59
'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN',
60
'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
62
exec '%s = object()' % (keyID,)
67
def __init__(self, *transports):
68
assert transports, "Cannot construct a TransportSequence with no transports"
69
self.transports = transports
71
for method in insults.ITerminalTransport:
73
def %s(self, *a, **kw):
74
for tpt in self.transports:
75
result = tpt.%s(*a, **kw)
77
""" % (method, method)
79
class LocalTerminalBufferMixin(object):
80
"""A mixin for RecvLine subclasses which records the state of the terminal.
82
This is accomplished by performing all L{ITerminalTransport} operations on both
83
the transport passed to makeConnection and an instance of helper.TerminalBuffer.
85
@ivar terminalCopy: A L{helper.TerminalBuffer} instance which efforts
86
will be made to keep up to date with the actual terminal
87
associated with this protocol instance.
90
def makeConnection(self, transport):
91
self.terminalCopy = helper.TerminalBuffer()
92
self.terminalCopy.connectionMade()
93
return super(LocalTerminalBufferMixin, self).makeConnection(
94
TransportSequence(transport, self.terminalCopy))
97
return str(self.terminalCopy)
99
class RecvLine(insults.TerminalProtocol):
100
"""L{TerminalProtocol} which adds line editing features.
102
Clients will be prompted for lines of input with all the usual
103
features: character echoing, left and right arrow support for
104
moving the cursor to different areas of the line buffer, backspace
105
and delete for removing characters, and insert for toggling
106
between typeover and insert mode. Tabs will be expanded to enough
107
spaces to move the cursor to the next tabstop (every four
108
characters by default). Enter causes the line buffer to be
109
cleared and the line to be passed to the lineReceived() method
110
which, by default, does nothing. Subclasses are responsible for
111
redrawing the input prompt (this will probably change).
118
ps = ('>>> ', '... ')
121
def connectionMade(self):
122
# A list containing the characters making up the current line
125
# A zero-based (wtf else?) index into self.lineBuffer.
126
# Indicates the current cursor position.
127
self.lineBufferIndex = 0
130
# A map of keyIDs to bound instance methods.
132
t.LEFT_ARROW: self.handle_LEFT,
133
t.RIGHT_ARROW: self.handle_RIGHT,
134
t.TAB: self.handle_TAB,
136
# Both of these should not be necessary, but figuring out
137
# which is necessary is a huge hassle.
138
'\r': self.handle_RETURN,
139
'\n': self.handle_RETURN,
141
t.BACKSPACE: self.handle_BACKSPACE,
142
t.DELETE: self.handle_DELETE,
143
t.INSERT: self.handle_INSERT,
144
t.HOME: self.handle_HOME,
145
t.END: self.handle_END}
147
self.initializeScreen()
149
def initializeScreen(self):
150
# Hmm, state sucks. Oh well.
151
# For now we will just take over the whole terminal.
152
self.terminal.reset()
153
self.terminal.write(self.ps[self.pn])
154
# XXX Note: I would prefer to default to starting in insert
155
# mode, however this does not seem to actually work! I do not
156
# know why. This is probably of interest to implementors
157
# subclassing RecvLine.
159
# XXX XXX Note: But the unit tests all expect the initial mode
160
# to be insert right now. Fuck, there needs to be a way to
161
# query the current mode or something.
162
# self.setTypeoverMode()
165
def currentLineBuffer(self):
166
s = ''.join(self.lineBuffer)
167
return s[:self.lineBufferIndex], s[self.lineBufferIndex:]
169
def setInsertMode(self):
171
self.terminal.setModes([insults.modes.IRM])
173
def setTypeoverMode(self):
174
self.mode = 'typeover'
175
self.terminal.resetModes([insults.modes.IRM])
177
def drawInputLine(self):
179
Write a line containing the current input prompt and the current line
180
buffer at the current cursor position.
182
self.terminal.write(self.ps[self.pn] + ''.join(self.lineBuffer))
184
def terminalSize(self, width, height):
185
# XXX - Clear the previous input line, redraw it at the new
187
self.terminal.eraseDisplay()
188
self.terminal.cursorHome()
193
def unhandledControlSequence(self, seq):
196
def keystrokeReceived(self, keyID, modifier):
197
m = self.keyHandlers.get(keyID)
200
elif keyID in string.printable:
201
self.characterReceived(keyID, False)
203
log.msg("Received unhandled keyID: %r" % (keyID,))
205
def characterReceived(self, ch, moreCharactersComing):
206
if self.mode == 'insert':
207
self.lineBuffer.insert(self.lineBufferIndex, ch)
209
self.lineBuffer[self.lineBufferIndex:self.lineBufferIndex+1] = [ch]
210
self.lineBufferIndex += 1
211
self.terminal.write(ch)
213
def handle_TAB(self):
214
n = self.TABSTOP - (len(self.lineBuffer) % self.TABSTOP)
215
self.terminal.cursorForward(n)
216
self.lineBufferIndex += n
217
self.lineBuffer.extend(' ' * n)
219
def handle_LEFT(self):
220
if self.lineBufferIndex > 0:
221
self.lineBufferIndex -= 1
222
self.terminal.cursorBackward()
224
def handle_RIGHT(self):
225
if self.lineBufferIndex < len(self.lineBuffer):
226
self.lineBufferIndex += 1
227
self.terminal.cursorForward()
229
def handle_HOME(self):
230
if self.lineBufferIndex:
231
self.terminal.cursorBackward(self.lineBufferIndex)
232
self.lineBufferIndex = 0
234
def handle_END(self):
235
offset = len(self.lineBuffer) - self.lineBufferIndex
237
self.terminal.cursorForward(offset)
238
self.lineBufferIndex = len(self.lineBuffer)
240
def handle_BACKSPACE(self):
241
if self.lineBufferIndex > 0:
242
self.lineBufferIndex -= 1
243
del self.lineBuffer[self.lineBufferIndex]
244
self.terminal.cursorBackward()
245
self.terminal.deleteCharacter()
247
def handle_DELETE(self):
248
if self.lineBufferIndex < len(self.lineBuffer):
249
del self.lineBuffer[self.lineBufferIndex]
250
self.terminal.deleteCharacter()
252
def handle_RETURN(self):
253
line = ''.join(self.lineBuffer)
255
self.lineBufferIndex = 0
256
self.terminal.nextLine()
257
self.lineReceived(line)
259
def handle_INSERT(self):
260
assert self.mode in ('typeover', 'insert')
261
if self.mode == 'typeover':
264
self.setTypeoverMode()
266
def lineReceived(self, line):
269
class HistoricRecvLine(RecvLine):
270
"""L{TerminalProtocol} which adds both basic line-editing features and input history.
272
Everything supported by L{RecvLine} is also supported by this class. In addition, the
273
up and down arrows traverse the input history. Each received line is automatically
274
added to the end of the input history.
276
def connectionMade(self):
277
RecvLine.connectionMade(self)
279
self.historyLines = []
280
self.historyPosition = 0
283
self.keyHandlers.update({t.UP_ARROW: self.handle_UP,
284
t.DOWN_ARROW: self.handle_DOWN})
286
def currentHistoryBuffer(self):
287
b = tuple(self.historyLines)
288
return b[:self.historyPosition], b[self.historyPosition:]
290
def _deliverBuffer(self, buf):
293
self.characterReceived(ch, True)
294
self.characterReceived(buf[-1], False)
297
if self.lineBuffer and self.historyPosition == len(self.historyLines):
298
self.historyLines.append(self.lineBuffer)
299
if self.historyPosition > 0:
301
self.terminal.eraseToLineEnd()
303
self.historyPosition -= 1
306
self._deliverBuffer(self.historyLines[self.historyPosition])
308
def handle_DOWN(self):
309
if self.historyPosition < len(self.historyLines) - 1:
311
self.terminal.eraseToLineEnd()
313
self.historyPosition += 1
316
self._deliverBuffer(self.historyLines[self.historyPosition])
319
self.terminal.eraseToLineEnd()
321
self.historyPosition = len(self.historyLines)
323
self.lineBufferIndex = 0
325
def handle_RETURN(self):
327
self.historyLines.append(''.join(self.lineBuffer))
328
self.historyPosition = len(self.historyLines)
329
return RecvLine.handle_RETURN(self)