~jocave/checkbox/hybrid-amd-gpu-mods

« back to all changes in this revision

Viewing changes to checkbox-old/scripts/pm_log_check

  • Committer: Tarmac
  • Author(s): Brendan Donegan
  • Date: 2013-06-03 11:12:58 UTC
  • mfrom: (2154.2.1 bug1185759)
  • Revision ID: tarmac-20130603111258-1b3m5ydvkf1accts
"[r=zkrynicki][bug=1185759][author=brendan-donegan] automatic merge by tarmac"

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python3
 
2
import os
 
3
import sys
 
4
import re
 
5
import difflib
 
6
import logging
 
7
from argparse import ArgumentParser
 
8
 
 
9
# Script return codes
 
10
SUCCESS = 0
 
11
NOT_MATCH = 1
 
12
NOT_FINISHED = 2
 
13
NOT_FOUND = 3
 
14
 
 
15
 
 
16
def main():
 
17
    args = parse_args()
 
18
 
 
19
    if not os.path.isfile(args.input_log_filename):
 
20
        sys.stderr.write('Log file {0!r} not found\n'
 
21
                         .format(args.input_log_filename))
 
22
        sys.exit(NOT_FOUND)
 
23
 
 
24
    LoggingConfiguration.set(args.log_level,
 
25
                             args.output_log_filename)
 
26
    parser = Parser(args.input_log_filename)
 
27
    results = parser.parse()
 
28
 
 
29
    if not compare_results(results):
 
30
        sys.exit(NOT_MATCH)
 
31
 
 
32
    sys.exit(SUCCESS)
 
33
 
 
34
 
 
35
class Parser(object):
 
36
    """
 
37
    Reboot test log file parser
 
38
    """
 
39
    is_logging_line = (re.compile('^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}')
 
40
                       .search)
 
41
    is_getting_info_line = (re.compile('Gathering hardware information...$')
 
42
                            .search)
 
43
    is_executing_line = (re.compile("Executing: '(?P<command>.*)'...$")
 
44
                         .search)
 
45
    is_output_line = re.compile('Output:$').search
 
46
    is_field_line = (re.compile('^- (?P<field>returncode|stdout|stderr):$')
 
47
                     .match)
 
48
    is_test_complete_line = re.compile('test complete$').search
 
49
 
 
50
    def __init__(self, filename):
 
51
        self.filename = filename
 
52
 
 
53
    def parse(self):
 
54
        """
 
55
        Parse log file and return results
 
56
        """
 
57
        with open(self.filename) as f:
 
58
            results = self._parse_file(LineIterator(f))
 
59
        return results
 
60
 
 
61
    def _parse_file(self, iterator):
 
62
        """
 
63
        Parse all lines in iterator and return results
 
64
        """
 
65
        results = []
 
66
        result = {}
 
67
 
 
68
        for line in iterator:
 
69
            if self.is_getting_info_line(line):
 
70
                if result:
 
71
                    # Add last result to list of results
 
72
                    results.append(result)
 
73
 
 
74
                # Initialize for a new iteration results
 
75
                result = {}
 
76
 
 
77
            match = self.is_executing_line(line)
 
78
            if match:
 
79
                command = match.group('command')
 
80
                command_output = self._parse_command_output(iterator)
 
81
 
 
82
                if command_output is not None:
 
83
                    result[command] = command_output
 
84
        else:
 
85
            if result:
 
86
                # Add last result to list of results
 
87
                results.append(result)
 
88
 
 
89
        if not self.is_test_complete_line(line):
 
90
            sys.stderr.write("Test didn't finish properly according to logs\n")
 
91
            sys.exit(NOT_FINISHED)
 
92
 
 
93
        return results
 
94
 
 
95
    def _parse_command_output(self, iterator):
 
96
        """
 
97
        Parse one command output
 
98
        """
 
99
        command_output = None
 
100
 
 
101
        # Skip all lines until command output is found
 
102
        for line in iterator:
 
103
            if self.is_output_line(line):
 
104
                command_output = {}
 
105
                break
 
106
            if (self.is_executing_line(line)
 
107
                or self.is_getting_info_line(line)):
 
108
                # Skip commands with no output
 
109
                iterator.unnext(line)
 
110
                return None
 
111
 
 
112
        # Parse command output message
 
113
        for line in iterator:
 
114
            match = self.is_field_line(line)
 
115
            if match:
 
116
                field = match.group('field')
 
117
                value = self._parse_command_output_field(iterator)
 
118
                command_output[field] = value
 
119
            # Exit when all command output fields
 
120
            # have been gathered
 
121
            else:
 
122
                iterator.unnext(line)
 
123
                break
 
124
 
 
125
        return command_output
 
126
 
 
127
    def _parse_command_output_field(self, iterator):
 
128
        """
 
129
        Parse one command output field
 
130
        """
 
131
        # Accummulate as many lines as needed
 
132
        # for the field value
 
133
        value = []
 
134
        for line in iterator:
 
135
            if (self.is_logging_line(line)
 
136
                or self.is_field_line(line)):
 
137
                iterator.unnext(line)
 
138
                break
 
139
 
 
140
            value.append(line)
 
141
 
 
142
        value = ''.join(value)
 
143
        return value
 
144
 
 
145
 
 
146
class LineIterator:
 
147
    """
 
148
    Iterator wrapper to make it possible
 
149
    to push back lines that shouldn't have been consumed
 
150
    """
 
151
 
 
152
    def __init__(self, iterator):
 
153
        self.iterator = iterator
 
154
        self.buffer = []
 
155
 
 
156
    def __iter__(self):
 
157
        return self
 
158
 
 
159
    def __next__(self):
 
160
        if self.buffer:
 
161
            return self.buffer.pop()
 
162
 
 
163
        return next(self.iterator)
 
164
 
 
165
    def unnext(self, line):
 
166
        self.buffer.append(line)
 
167
 
 
168
 
 
169
class LoggingConfiguration(object):
 
170
    @classmethod
 
171
    def set(cls, log_level, log_filename):
 
172
        """
 
173
        Configure a rotating file logger
 
174
        """
 
175
        logger = logging.getLogger()
 
176
        logger.setLevel(logging.DEBUG)
 
177
 
 
178
        # Log to sys.stderr using log level passed through command line
 
179
        if log_level != logging.NOTSET:
 
180
            log_handler = logging.StreamHandler()
 
181
            formatter = logging.Formatter('%(levelname)-8s %(message)s')
 
182
            log_handler.setFormatter(formatter)
 
183
            log_handler.setLevel(log_level)
 
184
            logger.addHandler(log_handler)
 
185
 
 
186
        # Log to rotating file using DEBUG log level
 
187
        log_handler = logging.FileHandler(log_filename, mode='w')
 
188
        formatter = logging.Formatter('%(asctime)s %(levelname)-8s '
 
189
                                      '%(message)s')
 
190
        log_handler.setFormatter(formatter)
 
191
        log_handler.setLevel(logging.DEBUG)
 
192
        logger.addHandler(log_handler)
 
193
 
 
194
 
 
195
def compare_results(results):
 
196
    """
 
197
    Compare results using first one as a baseline
 
198
    """
 
199
    baseline = results[0]
 
200
 
 
201
    success = True
 
202
    for index, result in enumerate(results[1:]):
 
203
        for command in baseline.keys():
 
204
            baseline_output = baseline[command]
 
205
            result_output = result[command]
 
206
 
 
207
            error_messages = []
 
208
            fields = (set(baseline_output.keys())
 
209
                      | set(result_output.keys()))
 
210
            for field in fields:
 
211
                baseline_field = baseline_output.get(field, '')
 
212
                result_field = result_output.get(field, '')
 
213
 
 
214
                if baseline_field != result_field:
 
215
                    differ = difflib.Differ()
 
216
 
 
217
                    message = ["** {field!r} field doesn't match:"
 
218
                               .format(field=field)]
 
219
                    comparison = differ.compare(baseline_field.splitlines(),
 
220
                                                result_field.splitlines())
 
221
                    message.extend(list(comparison))
 
222
                    error_messages.append('\n'.join(message))
 
223
 
 
224
            if not error_messages:
 
225
                logging.debug('[Iteration {0}] {1}...\t[OK]'
 
226
                              .format(index + 1, command))
 
227
            else:
 
228
                success = False
 
229
                if command.startswith('fwts'):
 
230
                    logging.error('[Iteration {0}] {1}...\t[FAIL]'
 
231
                                  .format(index + 1, command))
 
232
                else:
 
233
                    logging.error('[Iteration {0}] {1}...\t[FAIL]\n'
 
234
                                  .format(index + 1, command))
 
235
                    for message in error_messages:
 
236
                        logging.error(message)
 
237
 
 
238
    return success
 
239
 
 
240
 
 
241
def parse_args():
 
242
    """
 
243
    Parse command-line arguments
 
244
    """
 
245
    parser = ArgumentParser(description=('Check power management '
 
246
                                         'test case results'))
 
247
    parser.add_argument('input_log_filename', metavar='log_filename',
 
248
                        help=('Path to the input log file '
 
249
                              'on which to perform the check'))
 
250
    parser.add_argument('output_log_filename', metavar='log_filename',
 
251
                        help=('Path to the output log file '
 
252
                              'for the results of the check'))
 
253
    log_levels = ['notset', 'debug', 'info', 'warning', 'error', 'critical']
 
254
    parser.add_argument('--log-level', dest='log_level', default='info',
 
255
                        choices=log_levels,
 
256
                        help=('Log level. '
 
257
                              'One of {0} or {1} (%(default)s by default)'
 
258
                              .format(', '.join(log_levels[:-1]),
 
259
                                      log_levels[-1])))
 
260
    args = parser.parse_args()
 
261
    args.log_level = getattr(logging, args.log_level.upper())
 
262
 
 
263
    return args
 
264
 
 
265
 
 
266
if __name__ == '__main__':
 
267
    main()