~thomir-deactivatedaccount/adt-cloud-worker/trunk-make-config-visible

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/env python3

# adt-cloud-worker
# Copyright (C) 2015 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

"""Main entry point for adt-cloud-worker."""


from __future__ import print_function

import argparse
import configparser
import kombu
from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
from kombu.utils.debug import setup_logging
import os
import shutil
import subprocess
import tempfile

logger = get_logger(__name__)


class Worker(ConsumerMixin):

    def __init__(self, name, connection, queues):
        self.name = name
        self.connection = connection
        self.queues = queues

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.queues, callbacks=[self.process])]

    def process(self, body, message):
        logger.info('Got: {}'.format(body))
        # TODO: body validation
        # Ack message once it's valid or message.reject()
        message.ack()

        result_dir = tempfile.mkdtemp(
            prefix='adt-{}'.format(body['request_id']))

        try:
            # what information do we need from the message?
            adt_run_args = [
                '--apt-source', body['package_name'],
                '--output-dir', result_dir, # TODO: replace me
            ]
            if 'apt_pocket' in body:
                adt_run_args += [
                    '--apt-pocket', body['apt_pocket'],
                    '--apt-upgrade',
                ]
            adt_ssh_nova_args = [
                '--', 
                '--flavor', body['nova_flavor'],
                '--image', body['nova_image'],
            ]

            exit_code = run_adt(
                adt_run_args +
                [
                    '---',
                    'ssh',
                    '-s', 'nova',
                ] +
                adt_ssh_nova_args
            )

            # TODO: do sensible things with the exit code:
            # 0    all tests passed
            # 2    at least one test skipped
            # 4    at least one test failed
            # 6    at least one test failed and at least one test skipped
            # 8    no tests in this package
            # 12   erroneous package
            # 16   testbed failure
            # 20   other unexpected failures including bad usage

            # TODO: tar/gzip the output directory.
            # TODO: upload to swift instance at 'swift_container'

        except Exception as e:
            logger.error(e, exc_info=True)
        finally:
            # build the result message
            body['worker'] = self.name
            body['exit_code'] = exit_code
            # TODO: send error logging to result-checker in the message

            queue = self.connection.SimpleQueue('adt.results')
            queue.put(body)
            queue.close()
            # Drop the result directory
            # TODO: it could be possibly useful for clients.
            shutil.rmtree(result_dir)


def run_adt(arguments):
    """Run adt-run with the given arguments.

    Returns the exit code from adt-run.

    """
    # TODO: We probably want something more clever here:
    try:
        subprocess.check_call(['adt-run'] + arguments)
    except subprocess.CalledProcessError as e:
        # log?
        # TODO: filter log content to avoid leaking cloud credentials.
        return e.returncode
    return 0


def main():
    setup_logging(loglevel='DEBUG', loggers=[''])

    parser = argparse.ArgumentParser(
        description='ADT cloud worker ...')
    parser.add_argument('-c', '--conf', default='.adt-service.conf',
                        help='Configuration file path')
    args = parser.parse_args()

    # Load configuration options.
    config = configparser.ConfigParser()
    config.read(args.conf)
    worker_name = config.get('adt', 'name')
    routing_keys = config.get('adt', 'tags').split()
    amqp_uris = config.get('amqp', 'uris').split()

    # Setup nova environment variables based on the configuration file.
    for k, v in config.items('nova'):
        os.environ[k.upper()] = str(v)

    adt_exchange = kombu.Exchange("adt.exchange", type="topic")
    queues = []
    for routing_key in routing_keys:
        queues.append(
            kombu.Queue('adt.requests.{}'.format(routing_key) ,
                        adt_exchange, routing_key=routing_key)
        )

    with kombu.Connection(amqp_uris) as conn:
        try:
            worker = Worker(worker_name, conn, queues)
            worker.run()
        except KeyboardInterrupt:
            print('Bye!')


if __name__ == '__main__':
    main()