3
# Copyright (C) 2015 Canonical
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License as published by
7
# the Free Software Foundation, either version 3 of the License, or
8
# (at your option) any later version.
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
# GNU General Public License for more details.
15
# You should have received a copy of the GNU General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
import subunit_results
23
from util import logger, queue_to_list
26
class PractiSubunitSession(practitest.PractitestSession):
28
def upload_test_result(self, result):
29
logger.info('Reporting result for {}.'.format(result['test_name']))
30
exit_code = int(result['subunit_test_result']['status'] != 'success')
31
instance_display_id = result['practitest_instance']['id']
32
super().upload_test_result(exit_code, instance_display_id)
35
class PractiSubunitResultSet():
38
practisubunit_session,
42
# yes we need this b/c some digging is required
43
self.practisubunit_session = practisubunit_session
44
self.results = self._assemble_results(practitest_testset,
47
def _assembly_worker(self,
48
practitest_instances_queue,
52
"""Thread assembling a results dict."""
55
practitest_instance = practitest_instances_queue.get()
56
result['practitest_instance'] = practitest_instance
57
# NOTE: this is the expensive operation--need the tests
58
# list b/c custom fields appear in the *test* only, need
59
# to make a relation between instance and test on name
61
self.practisubunit_session.get_instance_custom_field(
66
result['test_name'] = subunit_test_name
68
subunit_results.get_subunit_test_by_name(
72
result['subunit_test_result'] = subunit_result
73
results_queue.put(result)
75
practitest_instances_queue.task_done()
77
def _assemble_results(self,
78
practitest_testset_number,
80
"""Assemble a simple dict of results.
82
{'test_name': _ # the subunit suite (autopilot) test name
83
'practitest_instance': _ # practitest dict
84
'subunit_test_result': _ # subunit dict
88
practitest_instances = self.practisubunit_session.get_instances(
89
practitest_testset_number)
90
practitest_tests = self.practisubunit_session.get_tests()
91
logger.info('Retrieving relevant PractiTest results. . . .')
93
practitest_instances_queue = queue.Queue()
94
practisubunit_results_queue = queue.Queue()
95
for practitest_instance in practitest_instances:
96
thread = threading.Thread(
97
target=self._assembly_worker,
98
args=(practitest_instances_queue,
101
practisubunit_results_queue)
103
thread.setDaemon(True)
105
practitest_instances_queue.put(practitest_instance)
106
practitest_instances_queue.join()
108
return queue_to_list(practisubunit_results_queue)
110
def report_to_practitest(self):
111
for result in self.results:
112
self.practisubunit_session.upload_test_result(result)