~terceiro/lava-dispatcher/filter-ansi-color-codes-mwhudson-state-machine

« back to all changes in this revision

Viewing changes to lava_dispatcher/utils.py

  • Committer: Antonio Terceiro
  • Date: 2013-08-08 22:23:31 UTC
  • Revision ID: antonio.terceiro@linaro.org-20130808222331-l2xqo15bzmooivbb
An solution based on Michael Hudson's state machine

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
import pexpect
37
37
 
38
38
from lava_dispatcher.errors import CriticalError
39
 
 
 
39
from lava_dispatcher.incremental_pattern_remover import (
 
40
    PatternRemover,
 
41
    ansi_escape_pattern
 
42
)
40
43
 
41
44
def link_or_copy_file(src, dest):
42
45
    try:
177
180
        # serial can be slow, races do funny things, so increase delay
178
181
        self.delaybeforesend = 0.05
179
182
 
 
183
        # handle ANSI escape sequences in subprocess output
 
184
        self._filtered_input = []
 
185
        self._ansi_escape_remover = PatternRemover(ansi_escape_pattern, self._filtered_input.append)
 
186
 
180
187
    def sendline(self, s=''):
181
188
        logging.debug("sendline : %s", s)
182
189
        return super(logging_spawn, self).sendline(s)
218
225
        read = super(logging_spawn, self).read_nonblocking
219
226
 
220
227
        raw = read(size, timeout)
221
 
        filtered = remove_ansi_escape_codes(raw)
222
 
 
223
 
        while '\x1b' in filtered:
224
 
            # This happens when the serial connection is too slow -- or just
225
 
            # broken -- and the escape sequences are split across different
226
 
            # reads. We have to accumulate more reads until we find the entire
227
 
            # sequence to be able to filter it out.
228
 
            raw = raw + read(size, timeout)
229
 
            logging.debug("Input with potentially incomplete ANSI "
230
 
                          "escape sequence: %r" % raw)
231
 
            filtered = remove_ansi_escape_codes(raw)
232
 
 
233
 
        return filtered
 
228
        self._ansi_escape_remover.feed(raw)
 
229
        self._ansi_escape_remover.flush()
 
230
 
 
231
        filtered = ''.join(self._filtered_input);
 
232
        del self._filtered_input[:]
 
233
 
 
234
        return filtered;
234
235
 
235
236
 
236
237
def connect_to_serial(context):
295
296
        logging.debug("Finalizing child proccess with PID %d" % proc.pid)
296
297
        proc.kill(9)
297
298
        proc.close()
298
 
 
299
 
def remove_ansi_escape_codes(string):
300
 
    return re.sub('(\x1b\[[0-9;=]*)+[a-zA-Z]', '', string)