~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/conch/insults/helper.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.conch.test.test_helper -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""Partial in-memory terminal emulator
 
6
 
 
7
API Stability: Unstable
 
8
 
 
9
@author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
 
10
"""
 
11
 
 
12
import re, string
 
13
 
 
14
from zope.interface import implements
 
15
 
 
16
from twisted.internet import defer, protocol, reactor
 
17
from twisted.python import log
 
18
 
 
19
from twisted.conch.insults import insults
 
20
 
 
21
FOREGROUND = 30
 
22
BACKGROUND = 40
 
23
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
 
24
 
 
25
class CharacterAttribute:
 
26
    """Represents the attributes of a single character.
 
27
 
 
28
    Character set, intensity, underlinedness, blinkitude, video
 
29
    reversal, as well as foreground and background colors made up a
 
30
    character's attributes.
 
31
    """
 
32
    def __init__(self, charset=insults.G0,
 
33
                 bold=False, underline=False,
 
34
                 blink=False, reverseVideo=False,
 
35
                 foreground=WHITE, background=BLACK,
 
36
 
 
37
                 _subtracting=False):
 
38
        self.charset = charset
 
39
        self.bold = bold
 
40
        self.underline = underline
 
41
        self.blink = blink
 
42
        self.reverseVideo = reverseVideo
 
43
        self.foreground = foreground
 
44
        self.background = background
 
45
 
 
46
        self._subtracting = _subtracting
 
47
 
 
48
    def __eq__(self, other):
 
49
        return vars(self) == vars(other)
 
50
 
 
51
    def __ne__(self, other):
 
52
        return not self.__eq__(other)
 
53
 
 
54
    def copy(self):
 
55
        c = self.__class__()
 
56
        c.__dict__.update(vars(self))
 
57
        return c
 
58
 
 
59
    def wantOne(self, **kw):
 
60
        k, v = kw.popitem()
 
61
        if getattr(self, k) != v:
 
62
            attr = self.copy()
 
63
            attr._subtracting = not v
 
64
            setattr(attr, k, v)
 
65
            return attr
 
66
        else:
 
67
            return self.copy()
 
68
 
 
69
    def toVT102(self):
 
70
        # Spit out a vt102 control sequence that will set up
 
71
        # all the attributes set here.  Except charset.
 
72
        attrs = []
 
73
        if self._subtracting:
 
74
            attrs.append(0)
 
75
        if self.bold:
 
76
            attrs.append(insults.BOLD)
 
77
        if self.underline:
 
78
            attrs.append(insults.UNDERLINE)
 
79
        if self.blink:
 
80
            attrs.append(insults.BLINK)
 
81
        if self.reverseVideo:
 
82
            attrs.append(insults.REVERSE_VIDEO)
 
83
        if self.foreground != WHITE:
 
84
            attrs.append(FOREGROUND + self.foreground)
 
85
        if self.background != BLACK:
 
86
            attrs.append(BACKGROUND + self.background)
 
87
        if attrs:
 
88
            return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
 
89
        return ''
 
90
 
 
91
# XXX - need to support scroll regions and scroll history
 
92
class TerminalBuffer(protocol.Protocol):
 
93
    implements(insults.ITerminalTransport)
 
94
 
 
95
    for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
 
96
                  'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN',
 
97
                  'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
 
98
                  'F10', 'F11', 'F12'):
 
99
        exec '%s = object()' % (keyID,)
 
100
 
 
101
    TAB = '\t'
 
102
    BACKSPACE = '\x7f'
 
103
 
 
104
    width = 80
 
105
    height = 24
 
106
 
 
107
    fill = ' '
 
108
    void = object()
 
109
 
 
110
    def getCharacter(self, x, y):
 
111
        return self.lines[y][x]
 
112
 
 
113
    def connectionMade(self):
 
114
        self.reset()
 
115
 
 
116
    def write(self, bytes):
 
117
        for b in bytes:
 
118
            self.insertAtCursor(b)
 
119
 
 
120
    def _currentCharacterAttributes(self):
 
121
        return CharacterAttribute(self.activeCharset, **self.graphicRendition)
 
122
 
 
123
    def insertAtCursor(self, b):
 
124
        if b == '\r':
 
125
            self.x = 0
 
126
        elif b == '\n' or self.x >= self.width:
 
127
            self.x = 0
 
128
            self._scrollDown()
 
129
        if b in string.printable and b not in '\r\n':
 
130
            ch = (b, self._currentCharacterAttributes())
 
131
            if self.modes.get(insults.modes.IRM):
 
132
                self.lines[self.y][self.x:self.x] = [ch]
 
133
                self.lines[self.y].pop()
 
134
            else:
 
135
                self.lines[self.y][self.x] = ch
 
136
            self.x += 1
 
137
 
 
138
    def _emptyLine(self, width):
 
139
        return [(self.void, self._currentCharacterAttributes()) for i in xrange(width)]
 
140
 
 
141
    def _scrollDown(self):
 
142
        self.y += 1
 
143
        if self.y >= self.height:
 
144
            self.y -= 1
 
145
            del self.lines[0]
 
146
            self.lines.append(self._emptyLine(self.width))
 
147
 
 
148
    def _scrollUp(self):
 
149
        self.y -= 1
 
150
        if self.y < 0:
 
151
            self.y = 0
 
152
            del self.lines[-1]
 
153
            self.lines.insert(0, self._emptyLine(self.width))
 
154
 
 
155
    def cursorUp(self, n=1):
 
156
        self.y = max(0, self.y - n)
 
157
 
 
158
    def cursorDown(self, n=1):
 
159
        self.y = min(self.height - 1, self.y + n)
 
160
 
 
161
    def cursorBackward(self, n=1):
 
162
        self.x = max(0, self.x - n)
 
163
 
 
164
    def cursorForward(self, n=1):
 
165
        self.x = min(self.width, self.x + n)
 
166
 
 
167
    def cursorPosition(self, column, line):
 
168
        self.x = column
 
169
        self.y = line
 
170
 
 
171
    def cursorHome(self):
 
172
        self.x = self.home.x
 
173
        self.y = self.home.y
 
174
 
 
175
    def index(self):
 
176
        self._scrollDown()
 
177
 
 
178
    def reverseIndex(self):
 
179
        self._scrollUp()
 
180
 
 
181
    def nextLine(self):
 
182
        self.insertAtCursor('\n')
 
183
 
 
184
    def saveCursor(self):
 
185
        self._savedCursor = (self.x, self.y)
 
186
 
 
187
    def restoreCursor(self):
 
188
        self.x, self.y = self._savedCursor
 
189
        del self._savedCursor
 
190
 
 
191
    def setModes(self, modes):
 
192
        for m in modes:
 
193
            self.modes[m] = True
 
194
 
 
195
    def resetModes(self, modes):
 
196
        for m in modes:
 
197
            try:
 
198
                del self.modes[m]
 
199
            except KeyError:
 
200
                pass
 
201
 
 
202
    def applicationKeypadMode(self):
 
203
        self.keypadMode = 'app'
 
204
 
 
205
    def numericKeypadMode(self):
 
206
        self.keypadMode = 'num'
 
207
 
 
208
    def selectCharacterSet(self, charSet, which):
 
209
        self.charsets[which] = charSet
 
210
 
 
211
    def shiftIn(self):
 
212
        self.activeCharset = insults.G0
 
213
 
 
214
    def shiftOut(self):
 
215
        self.activeCharset = insults.G1
 
216
 
 
217
    def singleShift2(self):
 
218
        oldActiveCharset = self.activeCharset
 
219
        self.activeCharset = insults.G2
 
220
        f = self.insertAtCursor
 
221
        def insertAtCursor(b):
 
222
            f(b)
 
223
            del self.insertAtCursor
 
224
            self.activeCharset = oldActiveCharset
 
225
        self.insertAtCursor = insertAtCursor
 
226
 
 
227
    def singleShift3(self):
 
228
        oldActiveCharset = self.activeCharset
 
229
        self.activeCharset = insults.G3
 
230
        f = self.insertAtCursor
 
231
        def insertAtCursor(b):
 
232
            f(b)
 
233
            del self.insertAtCursor
 
234
            self.activeCharset = oldActiveCharset
 
235
        self.insertAtCursor = insertAtCursor
 
236
 
 
237
    def selectGraphicRendition(self, *attributes):
 
238
        for a in attributes:
 
239
            if a == insults.NORMAL:
 
240
                self.graphicRendition = {
 
241
                    'bold': False,
 
242
                    'underline': False,
 
243
                    'blink': False,
 
244
                    'reverseVideo': False,
 
245
                    'foreground': WHITE,
 
246
                    'background': BLACK}
 
247
            elif a == insults.BOLD:
 
248
                self.graphicRendition['bold'] = True
 
249
            elif a == insults.UNDERLINE:
 
250
                self.graphicRendition['underline'] = True
 
251
            elif a == insults.BLINK:
 
252
                self.graphicRendition['blink'] = True
 
253
            elif a == insults.REVERSE_VIDEO:
 
254
                self.graphicRendition['reverseVideo'] = True
 
255
            else:
 
256
                try:
 
257
                    v = int(a)
 
258
                except ValueError:
 
259
                    log.msg("Unknown graphic rendition attribute: " + repr(a))
 
260
                else:
 
261
                    if FOREGROUND <= v <= FOREGROUND + N_COLORS:
 
262
                        self.graphicRendition['foreground'] = v - FOREGROUND
 
263
                    elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
 
264
                        self.graphicRendition['background'] = v - BACKGROUND
 
265
                    else:
 
266
                        log.msg("Unknown graphic rendition attribute: " + repr(a))
 
267
 
 
268
    def eraseLine(self):
 
269
        self.lines[self.y] = self._emptyLine(self.width)
 
270
 
 
271
    def eraseToLineEnd(self):
 
272
        width = self.width - self.x
 
273
        self.lines[self.y][self.x:] = self._emptyLine(width)
 
274
 
 
275
    def eraseToLineBeginning(self):
 
276
        self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
 
277
 
 
278
    def eraseDisplay(self):
 
279
        self.lines = [self._emptyLine(self.width) for i in xrange(self.height)]
 
280
 
 
281
    def eraseToDisplayEnd(self):
 
282
        self.eraseToLineEnd()
 
283
        height = self.height - self.y - 1
 
284
        self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
 
285
 
 
286
    def eraseToDisplayBeginning(self):
 
287
        self.eraseToLineBeginning()
 
288
        self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
 
289
 
 
290
    def deleteCharacter(self, n=1):
 
291
        del self.lines[self.y][self.x:self.x+n]
 
292
        self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
 
293
 
 
294
    def insertLine(self, n=1):
 
295
        self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
 
296
        del self.lines[self.height:]
 
297
 
 
298
    def deleteLine(self, n=1):
 
299
        del self.lines[self.y:self.y+n]
 
300
        self.lines.extend([self._emptyLine(self.width) for i in range(n)])
 
301
 
 
302
    def reportCursorPosition(self):
 
303
        return (self.x, self.y)
 
304
 
 
305
    def reset(self):
 
306
        self.home = insults.Vector(0, 0)
 
307
        self.x = self.y = 0
 
308
        self.modes = {}
 
309
        self.numericKeypad = 'app'
 
310
        self.activeCharset = insults.G0
 
311
        self.graphicRendition = {
 
312
            'bold': False,
 
313
            'underline': False,
 
314
            'blink': False,
 
315
            'reverseVideo': False,
 
316
            'foreground': WHITE,
 
317
            'background': BLACK}
 
318
        self.charsets = {
 
319
            insults.G0: insults.CS_US,
 
320
            insults.G1: insults.CS_US,
 
321
            insults.G2: insults.CS_ALTERNATE,
 
322
            insults.G3: insults.CS_ALTERNATE_SPECIAL}
 
323
        self.eraseDisplay()
 
324
 
 
325
    def unhandledControlSequence(self, buf):
 
326
        print 'Could not handle', repr(buf)
 
327
 
 
328
    def __str__(self):
 
329
        lines = []
 
330
        for L in self.lines:
 
331
            buf = []
 
332
            length = 0
 
333
            for (ch, attr) in L:
 
334
                if ch is not self.void:
 
335
                    buf.append(ch)
 
336
                    length = len(buf)
 
337
                else:
 
338
                    buf.append(self.fill)
 
339
            lines.append(''.join(buf[:length]))
 
340
        return '\n'.join(lines)
 
341
 
 
342
class ExpectationTimeout(Exception):
 
343
    pass
 
344
 
 
345
class ExpectableBuffer(TerminalBuffer):
 
346
    _mark = 0
 
347
 
 
348
    def connectionMade(self):
 
349
        TerminalBuffer.connectionMade(self)
 
350
        self._expecting = []
 
351
 
 
352
    def write(self, bytes):
 
353
        TerminalBuffer.write(self, bytes)
 
354
        self._checkExpected()
 
355
 
 
356
    def cursorHome(self):
 
357
        TerminalBuffer.cursorHome(self)
 
358
        self._mark = 0
 
359
 
 
360
    def _timeoutExpected(self, d):
 
361
        d.errback(ExpectationTimeout())
 
362
        self._checkExpected()
 
363
 
 
364
    def _checkExpected(self):
 
365
        s = str(self)[self._mark:]
 
366
        while self._expecting:
 
367
            expr, timer, deferred = self._expecting[0]
 
368
            if timer and not timer.active():
 
369
                del self._expecting[0]
 
370
                continue
 
371
            for match in expr.finditer(s):
 
372
                if timer:
 
373
                    timer.cancel()
 
374
                del self._expecting[0]
 
375
                self._mark += match.end()
 
376
                s = s[match.end():]
 
377
                deferred.callback(match)
 
378
                break
 
379
            else:
 
380
                return
 
381
 
 
382
    def expect(self, expression, timeout=None, scheduler=reactor):
 
383
        d = defer.Deferred()
 
384
        timer = None
 
385
        if timeout:
 
386
            timer = scheduler.callLater(timeout, self._timeoutExpected, d)
 
387
        self._expecting.append((re.compile(expression), timer, d))
 
388
        self._checkExpected()
 
389
        return d
 
390
 
 
391
__all__ = ['CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']