~allanlesage/uci-engine/coverage-extractor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python
# Ubuntu CI Engine
# Copyright 2014 Canonical Ltd.

# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License version 3, as
# published by the Free Software Foundation.

# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import optparse
import os
import sys

import mock

# this is called by cron via a symlink, so get the real "__file__"
here = os.path.dirname(__file__)
if os.path.islink(__file__):
    here = os.path.dirname(os.readlink(__file__))
sys.path.append(os.path.join(here, '../ci-utils'))

from ci_utils import amqp_utils, image_store, unit_config
from imagebuilder import run_worker

DESCRIPTION = 'Nagios-check for Image Builder service.'
VERSION = '1.0'


def _delete_image(log, result):
    for artifact in result.get('artifacts', []):
        if artifact['type'] == 'IMAGE':
            log.info('removing image created by test: %s', artifact['name'])
            img = image_store.ImageStore(unit_config.get_auth_config())
            for image in img.glance.images.list():
                if image.name == artifact['name']:
                    image.delete()
                    return


def check(log):
    """Exercises `ImageBuilderWorker.handle_request`.

    Use controlled (known to be good) parameters for building

    User a 'fake' data_store for collecting known files
    """
    ds = mock.Mock()
    ds.put_file.return_value = 'FAKE'

    worker = run_worker.ImageBuildWorker()
    # Patch the worker data_store factory to return the testing one.
    worker._create_data_store = lambda ticket_id: ds

    # patch the status cb logic which tries to send messages via rabbit
    def status_cb(queue, msg):
        print('PROGRESS_TRIGGER: ' + msg)
    amqp_utils.send = lambda x, y: status_cb(x, y)

    params = {
        'ticket_id': 'image-build-test',
        'image': unit_config.get('base_image'),
        'series': unit_config.get('series'),
        'ppa_list': ['ppa:phablet-team/tools'],
        'package_list': ['phablet-tools'],
        'progress_trigger': 'image-build-test',
    }

    result = {}
    try:
        (cb, result) = worker.handle_request(params, log)
        _delete_image(log, result)
    except Exception as exc:
        log.exception('worker raised: {}'.format(exc.message))
        return 1
    finally:
        worker.save_last_check(result)

    if cb is not amqp_utils.progress_completed:
        log.error('worker failed: {}\n{}'.format(cb, result))
        return 1
    return 0


def main():
    parser = optparse.OptionParser(description=DESCRIPTION, version=VERSION)
    parser.add_option("-l", "--log-level",
                      action="store", dest="log_level", default='ERROR',
                      help="Logging level name.")
    (options, args) = parser.parse_args()

    log = logging.getLogger()
    log_level = logging.getLevelName(options.log_level)
    try:
        log.setLevel(log_level)
    except ValueError as exc:
        log.warn('Using log level INFO ({})'.format(exc.message))
        log.setLevel(logging.INFO)

    return_code = check(log)

    if return_code != 0:
        print 'FAIL'
    else:
        print 'OK'

    sys.exit(return_code)


if __name__ == '__main__':
    main()