1
from ConfigParser import NoOptionError
8
from lava_dispatcher.lava_test_shell import _read_content
9
from lava_dispatcher.signals import SignalHandler
10
from lava_dispatcher.test_data import create_attachment
11
from lava_dispatcher.utils import mkdtemp
14
class ShellHooks(SignalHandler):
16
def __init__(self, testdef_obj, handlers={}, device_config_vars={}):
17
SignalHandler.__init__(self, testdef_obj)
18
self.result_dir = mkdtemp()
19
self.handlers = handlers
20
self.scratch_dir = mkdtemp()
21
self.code_dir = os.path.join(self.scratch_dir, 'code')
22
shutil.copytree(testdef_obj.repo, self.code_dir)
23
device_config = testdef_obj.context.client.target_device.config
24
self.our_env = os.environ.copy()
25
for env_var, config_var in device_config_vars.iteritems():
27
config_value = device_config.cp.get('__main__', config_var)
30
"No value found for device config %s; leaving %s unset "
31
"in environment", config_var, env_var)
33
self.our_env[env_var] = config_value
35
def _invoke_hook(self, name, working_dir, args=[]):
36
script_name = self.handlers.get(name)
39
script = os.path.join(self.code_dir, script_name)
40
if not os.path.exists(script):
41
logging.warning("handler script %s not found", script_name)
43
(fd, path) = tempfile.mkstemp(dir=self.code_dir)
44
status = subprocess.call(
45
[script] + args, cwd=working_dir, env=self.our_env,
46
stdout=fd, stderr=subprocess.STDOUT)
49
"%s handler script exited with code %s", name, status)
52
def start_testcase(self, test_case_id):
53
case_dir = os.path.join(self.result_dir, test_case_id)
55
case_data = {'case_dir': case_dir}
56
case_data['start_testcase_output'] = self._invoke_hook(
57
'start_testcase', case_dir)
60
def end_testcase(self, test_case_id, case_data):
61
case_data['end_testcase_output'] = self._invoke_hook(
62
'end_testcase', case_data['case_dir'])
64
def postprocess_test_result(self, test_result, case_data):
65
with self._result_as_dir(test_result) as result_dir:
66
case_data['postprocess_test_result_output'] = self._invoke_hook(
67
'postprocess_test_result', case_data['case_dir'], [result_dir])
69
for key in 'start_testcase_output', 'end_testcase_output', \
70
'postprocess_test_result_output':
71
path = case_data.get(key)
74
content = _read_content(path, ignore_missing=True)
76
test_result['attachments'].append(
77
create_attachment(key + '.txt', _read_content(path)))