~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/conch/insults/helper.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.conch.test.test_helper -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Partial in-memory terminal emulator
 
7
 
 
8
@author: Jp Calderone
 
9
"""
 
10
 
 
11
import re, string
 
12
 
 
13
from zope.interface import implements
 
14
 
 
15
from twisted.internet import defer, protocol, reactor
 
16
from twisted.python import log
 
17
 
 
18
from twisted.conch.insults import insults
 
19
 
 
20
FOREGROUND = 30
 
21
BACKGROUND = 40
 
22
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
 
23
 
 
24
class CharacterAttribute:
 
25
    """Represents the attributes of a single character.
 
26
 
 
27
    Character set, intensity, underlinedness, blinkitude, video
 
28
    reversal, as well as foreground and background colors made up a
 
29
    character's attributes.
 
30
    """
 
31
    def __init__(self, charset=insults.G0,
 
32
                 bold=False, underline=False,
 
33
                 blink=False, reverseVideo=False,
 
34
                 foreground=WHITE, background=BLACK,
 
35
 
 
36
                 _subtracting=False):
 
37
        self.charset = charset
 
38
        self.bold = bold
 
39
        self.underline = underline
 
40
        self.blink = blink
 
41
        self.reverseVideo = reverseVideo
 
42
        self.foreground = foreground
 
43
        self.background = background
 
44
 
 
45
        self._subtracting = _subtracting
 
46
 
 
47
    def __eq__(self, other):
 
48
        return vars(self) == vars(other)
 
49
 
 
50
    def __ne__(self, other):
 
51
        return not self.__eq__(other)
 
52
 
 
53
    def copy(self):
 
54
        c = self.__class__()
 
55
        c.__dict__.update(vars(self))
 
56
        return c
 
57
 
 
58
    def wantOne(self, **kw):
 
59
        k, v = kw.popitem()
 
60
        if getattr(self, k) != v:
 
61
            attr = self.copy()
 
62
            attr._subtracting = not v
 
63
            setattr(attr, k, v)
 
64
            return attr
 
65
        else:
 
66
            return self.copy()
 
67
 
 
68
    def toVT102(self):
 
69
        # Spit out a vt102 control sequence that will set up
 
70
        # all the attributes set here.  Except charset.
 
71
        attrs = []
 
72
        if self._subtracting:
 
73
            attrs.append(0)
 
74
        if self.bold:
 
75
            attrs.append(insults.BOLD)
 
76
        if self.underline:
 
77
            attrs.append(insults.UNDERLINE)
 
78
        if self.blink:
 
79
            attrs.append(insults.BLINK)
 
80
        if self.reverseVideo:
 
81
            attrs.append(insults.REVERSE_VIDEO)
 
82
        if self.foreground != WHITE:
 
83
            attrs.append(FOREGROUND + self.foreground)
 
84
        if self.background != BLACK:
 
85
            attrs.append(BACKGROUND + self.background)
 
86
        if attrs:
 
87
            return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
 
88
        return ''
 
89
 
 
90
# XXX - need to support scroll regions and scroll history
 
91
class TerminalBuffer(protocol.Protocol):
 
92
    """
 
93
    An in-memory terminal emulator.
 
94
    """
 
95
    implements(insults.ITerminalTransport)
 
96
 
 
97
    for keyID in ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
 
98
                  'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN',
 
99
                  'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
 
100
                  'F10', 'F11', 'F12'):
 
101
        exec '%s = object()' % (keyID,)
 
102
 
 
103
    TAB = '\t'
 
104
    BACKSPACE = '\x7f'
 
105
 
 
106
    width = 80
 
107
    height = 24
 
108
 
 
109
    fill = ' '
 
110
    void = object()
 
111
 
 
112
    def getCharacter(self, x, y):
 
113
        return self.lines[y][x]
 
114
 
 
115
    def connectionMade(self):
 
116
        self.reset()
 
117
 
 
118
    def write(self, bytes):
 
119
        """
 
120
        Add the given printable bytes to the terminal.
 
121
 
 
122
        Line feeds in C{bytes} will be replaced with carriage return / line
 
123
        feed pairs.
 
124
        """
 
125
        for b in bytes.replace('\n', '\r\n'):
 
126
            self.insertAtCursor(b)
 
127
 
 
128
    def _currentCharacterAttributes(self):
 
129
        return CharacterAttribute(self.activeCharset, **self.graphicRendition)
 
130
 
 
131
    def insertAtCursor(self, b):
 
132
        """
 
133
        Add one byte to the terminal at the cursor and make consequent state
 
134
        updates.
 
135
 
 
136
        If b is a carriage return, move the cursor to the beginning of the
 
137
        current row.
 
138
 
 
139
        If b is a line feed, move the cursor to the next row or scroll down if
 
140
        the cursor is already in the last row.
 
141
 
 
142
        Otherwise, if b is printable, put it at the cursor position (inserting
 
143
        or overwriting as dictated by the current mode) and move the cursor.
 
144
        """
 
145
        if b == '\r':
 
146
            self.x = 0
 
147
        elif b == '\n':
 
148
            self._scrollDown()
 
149
        elif b in string.printable:
 
150
            if self.x >= self.width:
 
151
                self.nextLine()
 
152
            ch = (b, self._currentCharacterAttributes())
 
153
            if self.modes.get(insults.modes.IRM):
 
154
                self.lines[self.y][self.x:self.x] = [ch]
 
155
                self.lines[self.y].pop()
 
156
            else:
 
157
                self.lines[self.y][self.x] = ch
 
158
            self.x += 1
 
159
 
 
160
    def _emptyLine(self, width):
 
161
        return [(self.void, self._currentCharacterAttributes()) for i in xrange(width)]
 
162
 
 
163
    def _scrollDown(self):
 
164
        self.y += 1
 
165
        if self.y >= self.height:
 
166
            self.y -= 1
 
167
            del self.lines[0]
 
168
            self.lines.append(self._emptyLine(self.width))
 
169
 
 
170
    def _scrollUp(self):
 
171
        self.y -= 1
 
172
        if self.y < 0:
 
173
            self.y = 0
 
174
            del self.lines[-1]
 
175
            self.lines.insert(0, self._emptyLine(self.width))
 
176
 
 
177
    def cursorUp(self, n=1):
 
178
        self.y = max(0, self.y - n)
 
179
 
 
180
    def cursorDown(self, n=1):
 
181
        self.y = min(self.height - 1, self.y + n)
 
182
 
 
183
    def cursorBackward(self, n=1):
 
184
        self.x = max(0, self.x - n)
 
185
 
 
186
    def cursorForward(self, n=1):
 
187
        self.x = min(self.width, self.x + n)
 
188
 
 
189
    def cursorPosition(self, column, line):
 
190
        self.x = column
 
191
        self.y = line
 
192
 
 
193
    def cursorHome(self):
 
194
        self.x = self.home.x
 
195
        self.y = self.home.y
 
196
 
 
197
    def index(self):
 
198
        self._scrollDown()
 
199
 
 
200
    def reverseIndex(self):
 
201
        self._scrollUp()
 
202
 
 
203
    def nextLine(self):
 
204
        """
 
205
        Update the cursor position attributes and scroll down if appropriate.
 
206
        """
 
207
        self.x = 0
 
208
        self._scrollDown()
 
209
 
 
210
    def saveCursor(self):
 
211
        self._savedCursor = (self.x, self.y)
 
212
 
 
213
    def restoreCursor(self):
 
214
        self.x, self.y = self._savedCursor
 
215
        del self._savedCursor
 
216
 
 
217
    def setModes(self, modes):
 
218
        for m in modes:
 
219
            self.modes[m] = True
 
220
 
 
221
    def resetModes(self, modes):
 
222
        for m in modes:
 
223
            try:
 
224
                del self.modes[m]
 
225
            except KeyError:
 
226
                pass
 
227
 
 
228
 
 
229
    def setPrivateModes(self, modes):
 
230
        """
 
231
        Enable the given modes.
 
232
 
 
233
        Track which modes have been enabled so that the implementations of
 
234
        other L{insults.ITerminalTransport} methods can be properly implemented
 
235
        to respect these settings.
 
236
 
 
237
        @see: L{resetPrivateModes}
 
238
        @see: L{insults.ITerminalTransport.setPrivateModes}
 
239
        """
 
240
        for m in modes:
 
241
            self.privateModes[m] = True
 
242
 
 
243
 
 
244
    def resetPrivateModes(self, modes):
 
245
        """
 
246
        Disable the given modes.
 
247
 
 
248
        @see: L{setPrivateModes}
 
249
        @see: L{insults.ITerminalTransport.resetPrivateModes}
 
250
        """
 
251
        for m in modes:
 
252
            try:
 
253
                del self.privateModes[m]
 
254
            except KeyError:
 
255
                pass
 
256
 
 
257
 
 
258
    def applicationKeypadMode(self):
 
259
        self.keypadMode = 'app'
 
260
 
 
261
    def numericKeypadMode(self):
 
262
        self.keypadMode = 'num'
 
263
 
 
264
    def selectCharacterSet(self, charSet, which):
 
265
        self.charsets[which] = charSet
 
266
 
 
267
    def shiftIn(self):
 
268
        self.activeCharset = insults.G0
 
269
 
 
270
    def shiftOut(self):
 
271
        self.activeCharset = insults.G1
 
272
 
 
273
    def singleShift2(self):
 
274
        oldActiveCharset = self.activeCharset
 
275
        self.activeCharset = insults.G2
 
276
        f = self.insertAtCursor
 
277
        def insertAtCursor(b):
 
278
            f(b)
 
279
            del self.insertAtCursor
 
280
            self.activeCharset = oldActiveCharset
 
281
        self.insertAtCursor = insertAtCursor
 
282
 
 
283
    def singleShift3(self):
 
284
        oldActiveCharset = self.activeCharset
 
285
        self.activeCharset = insults.G3
 
286
        f = self.insertAtCursor
 
287
        def insertAtCursor(b):
 
288
            f(b)
 
289
            del self.insertAtCursor
 
290
            self.activeCharset = oldActiveCharset
 
291
        self.insertAtCursor = insertAtCursor
 
292
 
 
293
    def selectGraphicRendition(self, *attributes):
 
294
        for a in attributes:
 
295
            if a == insults.NORMAL:
 
296
                self.graphicRendition = {
 
297
                    'bold': False,
 
298
                    'underline': False,
 
299
                    'blink': False,
 
300
                    'reverseVideo': False,
 
301
                    'foreground': WHITE,
 
302
                    'background': BLACK}
 
303
            elif a == insults.BOLD:
 
304
                self.graphicRendition['bold'] = True
 
305
            elif a == insults.UNDERLINE:
 
306
                self.graphicRendition['underline'] = True
 
307
            elif a == insults.BLINK:
 
308
                self.graphicRendition['blink'] = True
 
309
            elif a == insults.REVERSE_VIDEO:
 
310
                self.graphicRendition['reverseVideo'] = True
 
311
            else:
 
312
                try:
 
313
                    v = int(a)
 
314
                except ValueError:
 
315
                    log.msg("Unknown graphic rendition attribute: " + repr(a))
 
316
                else:
 
317
                    if FOREGROUND <= v <= FOREGROUND + N_COLORS:
 
318
                        self.graphicRendition['foreground'] = v - FOREGROUND
 
319
                    elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
 
320
                        self.graphicRendition['background'] = v - BACKGROUND
 
321
                    else:
 
322
                        log.msg("Unknown graphic rendition attribute: " + repr(a))
 
323
 
 
324
    def eraseLine(self):
 
325
        self.lines[self.y] = self._emptyLine(self.width)
 
326
 
 
327
    def eraseToLineEnd(self):
 
328
        width = self.width - self.x
 
329
        self.lines[self.y][self.x:] = self._emptyLine(width)
 
330
 
 
331
    def eraseToLineBeginning(self):
 
332
        self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
 
333
 
 
334
    def eraseDisplay(self):
 
335
        self.lines = [self._emptyLine(self.width) for i in xrange(self.height)]
 
336
 
 
337
    def eraseToDisplayEnd(self):
 
338
        self.eraseToLineEnd()
 
339
        height = self.height - self.y - 1
 
340
        self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
 
341
 
 
342
    def eraseToDisplayBeginning(self):
 
343
        self.eraseToLineBeginning()
 
344
        self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
 
345
 
 
346
    def deleteCharacter(self, n=1):
 
347
        del self.lines[self.y][self.x:self.x+n]
 
348
        self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
 
349
 
 
350
    def insertLine(self, n=1):
 
351
        self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
 
352
        del self.lines[self.height:]
 
353
 
 
354
    def deleteLine(self, n=1):
 
355
        del self.lines[self.y:self.y+n]
 
356
        self.lines.extend([self._emptyLine(self.width) for i in range(n)])
 
357
 
 
358
    def reportCursorPosition(self):
 
359
        return (self.x, self.y)
 
360
 
 
361
    def reset(self):
 
362
        self.home = insults.Vector(0, 0)
 
363
        self.x = self.y = 0
 
364
        self.modes = {}
 
365
        self.privateModes = {}
 
366
        self.setPrivateModes([insults.privateModes.AUTO_WRAP,
 
367
                              insults.privateModes.CURSOR_MODE])
 
368
        self.numericKeypad = 'app'
 
369
        self.activeCharset = insults.G0
 
370
        self.graphicRendition = {
 
371
            'bold': False,
 
372
            'underline': False,
 
373
            'blink': False,
 
374
            'reverseVideo': False,
 
375
            'foreground': WHITE,
 
376
            'background': BLACK}
 
377
        self.charsets = {
 
378
            insults.G0: insults.CS_US,
 
379
            insults.G1: insults.CS_US,
 
380
            insults.G2: insults.CS_ALTERNATE,
 
381
            insults.G3: insults.CS_ALTERNATE_SPECIAL}
 
382
        self.eraseDisplay()
 
383
 
 
384
    def unhandledControlSequence(self, buf):
 
385
        print 'Could not handle', repr(buf)
 
386
 
 
387
    def __str__(self):
 
388
        lines = []
 
389
        for L in self.lines:
 
390
            buf = []
 
391
            length = 0
 
392
            for (ch, attr) in L:
 
393
                if ch is not self.void:
 
394
                    buf.append(ch)
 
395
                    length = len(buf)
 
396
                else:
 
397
                    buf.append(self.fill)
 
398
            lines.append(''.join(buf[:length]))
 
399
        return '\n'.join(lines)
 
400
 
 
401
class ExpectationTimeout(Exception):
 
402
    pass
 
403
 
 
404
class ExpectableBuffer(TerminalBuffer):
 
405
    _mark = 0
 
406
 
 
407
    def connectionMade(self):
 
408
        TerminalBuffer.connectionMade(self)
 
409
        self._expecting = []
 
410
 
 
411
    def write(self, bytes):
 
412
        TerminalBuffer.write(self, bytes)
 
413
        self._checkExpected()
 
414
 
 
415
    def cursorHome(self):
 
416
        TerminalBuffer.cursorHome(self)
 
417
        self._mark = 0
 
418
 
 
419
    def _timeoutExpected(self, d):
 
420
        d.errback(ExpectationTimeout())
 
421
        self._checkExpected()
 
422
 
 
423
    def _checkExpected(self):
 
424
        s = str(self)[self._mark:]
 
425
        while self._expecting:
 
426
            expr, timer, deferred = self._expecting[0]
 
427
            if timer and not timer.active():
 
428
                del self._expecting[0]
 
429
                continue
 
430
            for match in expr.finditer(s):
 
431
                if timer:
 
432
                    timer.cancel()
 
433
                del self._expecting[0]
 
434
                self._mark += match.end()
 
435
                s = s[match.end():]
 
436
                deferred.callback(match)
 
437
                break
 
438
            else:
 
439
                return
 
440
 
 
441
    def expect(self, expression, timeout=None, scheduler=reactor):
 
442
        d = defer.Deferred()
 
443
        timer = None
 
444
        if timeout:
 
445
            timer = scheduler.callLater(timeout, self._timeoutExpected, d)
 
446
        self._expecting.append((re.compile(expression), timer, d))
 
447
        self._checkExpected()
 
448
        return d
 
449
 
 
450
__all__ = ['CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']