3
6
from hwtest.excluder import Excluder
4
7
from hwtest.iterator import Iterator
5
8
from hwtest.repeater import PreRepeater
6
9
from hwtest.resolver import Resolver
7
from hwtest.plugin import Plugin
9
11
from hwtest.answer import Answer, NO, SKIP
10
from hwtest.template import convert_string
12
from hwtest.lib.file import reader
12
from hwtest.report_helpers import createElement, createTypedElement
14
15
DESKTOP = 'desktop'
17
18
ALL_CATEGORIES = [DESKTOP, LAPTOP, SERVER]
21
class QuestionParser(object):
26
def load_data(self, **data):
27
if "name" not in data:
29
"Question data does not contain a 'name': %s" % data
31
logging.info("Loading question data for: %s", data["name"])
33
if filter(lambda q: q["name"] == data["name"], self.questions):
35
"Question %s already has a question of the same name." \
38
self.questions.append(data)
40
def load_path(self, path):
41
logging.info("Loading question from path: %s", path)
44
for string in reader(fd):
47
def save(field, value, extended, path):
48
if value and extended:
50
"Path %s has both a value and an extended value." % path
51
extended = extended.rstrip("\n")
53
if data.has_key(field):
55
"Path %s has a duplicate field '%s' with a new value '%s'." \
56
% (path, field, value)
57
data[field] = value or extended
59
string = string.strip("\n")
60
field = value = extended = ''
61
for line in string.split("\n"):
63
match = re.search(r"^([-_.A-Za-z0-9]*):\s?(.*)", line)
65
save(field, value, extended, path)
66
field = match.groups()[0].lower()
67
value = match.groups()[1].rstrip()
69
basefield = re.sub(r"-.+$", "", field)
72
if re.search(r"^\s\.$", line):
76
match = re.search(r"^\s(\s+.*)", line)
78
bit = match.groups()[0].rstrip()
79
if len(extended) and not re.search(r"[\n ]$", extended):
82
extended += bit + "\n"
85
match = re.search(r"^\s(.*)", line)
87
bit = match.groups()[0].rstrip()
88
if len(extended) and not re.search(r"[\n ]$", extended):
94
raise Exception, "Path %s parse error at: %s" \
97
save(field, value, extended, path)
98
self.load_data(**data)
100
def load_directory(self, directory):
101
logging.info("Loading questions from directory: %s", directory)
102
for name in [name for name in os.listdir(directory)
103
if name.endswith(".txt")]:
104
path = os.path.join(directory, name)
20
108
class QuestionManager(object):
21
110
def __init__(self):
24
def add(self, question):
25
self.questions.append(question)
113
def add_question(self, question):
114
self._questions.append(question)
27
116
def get_iterator(self):
28
117
def repeat_func(question, resolver):
29
118
answer = question.answer
30
119
if answer and (answer.status == NO or answer.status == SKIP):
31
120
for dependent in resolver.get_dependents(question):
32
dependent.create_answer(SKIP, auto=True)
121
dependent.set_answer(SKIP, auto=True)
34
123
def exclude_next_func(question):
35
124
return question.answer != None
44
133
resolver = Resolver()
45
question_dict = dict((question.name, question) for question in self.questions)
46
for question in self.questions:
47
question_deps = [question_dict[dep] for dep in question.deps]
48
resolver.add(question, *question_deps)
134
question_dict = dict((question.name, question) for question in self._questions)
135
for question in self._questions:
136
question_depends = [question_dict[d] for d in question.depends]
137
resolver.add(question, *question_depends)
50
139
questions = resolver.get_dependents()
51
140
questions_iter = Iterator(questions)
60
149
class Question(object):
62
def __init__(self, name, desc, deps=[], cats=ALL_CATEGORIES, optional=False, command=None):
63
self.name = self.persist_name = name
67
self.optional = optional
68
self.command = command
151
required_fields = ["name", "description"]
153
"categories": ALL_CATEGORIES,
158
def __init__(self, **kwargs):
69
160
self.answer = None
164
for field in self.data.keys():
165
if field not in self.required_fields + self.optional_fields.keys():
167
"Question data contains unknown field: %s" \
170
for field in self.required_fields:
171
if not self.data.has_key(field):
173
"Question data does not contain a '%s': %s" \
176
for field in self.optional_fields.keys():
177
if not self.data.has_key(field):
178
self.data[field] = self.optional_fields[field]
71
180
def __str__(self):
75
def description(self):
76
description = self.desc
78
result = self.command()
79
description = convert_string(self.desc, {'result': result})
87
def create_answer(self, status, data='', auto=False):
183
def __getattr__(self, attr):
184
if attr in self.data:
185
return self.data[attr]
187
raise AttributeError, attr
189
def set_answer(self, status, data='', auto=False):
88
190
self.answer = Answer(self, status, data, auto)
92
class QuestionPlugin(Plugin):
99
report = self._manager.report
100
if not report.finalised:
101
for question in self.questions:
102
tests = getattr(report, 'tests', None)
104
tests = createElement(report, 'tests', report.root)
106
test = createElement(report, 'test', tests)
107
createElement(report, 'suite', test, 'tool')
108
createElement(report, 'name', test, question.name)
109
createElement(report, 'description', test, question.description)
110
createElement(report, 'command', test)
111
createElement(report, 'architectures', test)
112
createTypedElement(report, 'categories', test, None,
113
question.categories, True, 'category')
114
createElement(report, 'optional', test, question.optional)
117
result = createElement(report, 'result', test)
118
createElement(report, 'result_status', result,
119
question.answer.status)
120
createElement(report, 'result_data', result,
121
question.answer.data)
124
for question in self.questions:
125
self._manager.reactor.fire(("prompt", "add-question"), question)