~sylvain-pineau/checkbox/restore_IOLogGenerated

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# This file is part of Checkbox.
#
# Copyright 2013 Canonical Ltd.
# Written by:
#   Zygmunt Krynicki <zygmunt.krynicki@canonical.com>
#
# Checkbox is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Checkbox is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.

"""
:mod:`plainbox.impl.highlevel` -- High-level API
================================================
"""

from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from threading import Thread
import logging

from plainbox import __version__ as plainbox_version
from plainbox.abc import IJobResult
from plainbox.impl.exporter import get_all_exporters
from plainbox.impl.result import MemoryJobResult
from plainbox.impl.runner import JobRunner
from plainbox.impl.session.legacy import SessionStateLegacyAPI as SessionState
from plainbox.impl.applogic import run_job_if_possible


logger = logging.getLogger("plainbox.highlevel")


class Service:

    def __init__(self, provider_list, session_list, config):
        # TODO: session_list will be changed to session_manager_list
        self._provider_list = provider_list
        self._session_list = session_list
        self._executor = ThreadPoolExecutor(1)
        self._config = config

    def close(self):
        self._executor.shutdown()

    @property
    def version(self):
        return "{}.{}.{}".format(*plainbox_version[:3])

    @property
    def provider_list(self):
        return self._provider_list

    @property
    def session_list(self):
        return self._session_list

    def create_session(self, job_list):
        # TODO: allocate storage
        # TODO: construct state
        # TODO: construct manager, binding storage and state
        # TODO: if something fails destroy storage
        session = SessionState(job_list)
        session.open()
        return session

    def get_all_exporters(self):
        return {name: exporter_cls.supported_option_list for
                name, exporter_cls in get_all_exporters().items()}

    def export_session(self, session, output_format, option_list):
        temp_stream = BytesIO()
        self._export_session_to_stream(session, output_format,
                                       option_list, temp_stream)
        return temp_stream.getvalue()

    def export_session_to_file(self, session, output_format, option_list,
                               output_file):
        with open(output_file, 'wb') as stream:
            self._export_session_to_stream(
                session, output_format, option_list, stream)
        return output_file

    def _export_session_to_stream(self, session, output_format, option_list,
                                  stream):
        exporter_cls = get_all_exporters()[output_format]
        exporter = exporter_cls(option_list)
        data_subset = exporter.get_session_data_subset(session)
        exporter.dump(data_subset, stream)

    def _run(self, session, job, running_job_wrapper):
        """
        Start a JobRunner in a separate thread
        """
        runner = JobRunner(
            session.session_dir,
            session.jobs_io_log_dir,
            command_io_delegate=running_job_wrapper.ui_io_delegate,
            interaction_callback=running_job_wrapper.emitAskForOutcomeSignal
        )
        job_state = session.job_state_map[job.name]
        if job_state.can_start():
            job_result = runner.run_job(job)
        else:
            job_result = MemoryJobResult({
                'outcome': IJobResult.OUTCOME_NOT_SUPPORTED,
                'comments': job_state.get_readiness_description()
            })
        if job_result.outcome is not IJobResult.OUTCOME_UNDECIDED:
            running_job_wrapper.update_job_result_callback(job, job_result)

    # FIXME: broken layering, running_job_wrapper is from the dbus layer
    def run_job(self, session, job, running_job_wrapper):
        runner = Thread(target=self._run,
                        args=(session, job, running_job_wrapper))
        runner.start()
        # FIXME: we need to keep track of this thread
        return job

    def prime_job(self, session, job):
        """
        Prime the specified job for running.

        The job will be invoked in a context specific to the session.
        The configuration object associated with this service instance might be
        used to fetch any additional configuration data for certain jobs
        (environment variables)

        :returns: a primed job, ready to be started
        """
        return PrimedJob(self, session, job)


class PrimedJob:
    """
    Job primed for execution.

    This only really exists because of sloppy GUI API design that invented
    "RunningJob" entity which has a "RunCommand()" method that ...runs the job.
    """

    def __init__(self, service, session, job):
        """
        Initialize a primed job.

        This should not be called by applications.
        Please call :meth:`Service.prime_job()` instead.
        """
        self._service = service
        self._session = session
        self._job = job
        self._runner = JobRunner(session.session_dir, session.jobs_io_log_dir)

    @property
    def job(self):
        """
        The job to be executed
        """
        return self._job

    def run(self):
        """
        Run the primed job.

        :returns:
            Future for the job result

        .. note::
            This method returns immediately, before the job finishes running.
        """
        return self._service._executor.submit(self._really_run)

    def _really_run(self):
        """
        Internal method called in executor context.

        Runs a job with run_job_if_possible() and returns the result
        """
        # Run the job if possible
        job_state, job_result = run_job_if_possible(
            self._session, self._runner, self._service._config, self._job,
            # Don't call update on your own please
            update=False)
        return job_result