~cprov/adt-cloud-worker/test-git-access

« back to all changes in this revision

Viewing changes to adt_cloud_worker/__init__.py

  • Committer: Celso Providelo
  • Date: 2015-03-20 00:13:24 UTC
  • mto: This revision was merged to the branch mainline in revision 29.
  • Revision ID: celso.providelo@canonical.com-20150320001324-khix9a7f19mcwjjp
Add logstash / file logging handlers and cleanup messages.

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
import argparse
24
24
import configparser
25
25
import kombu
26
 
from kombu.log import get_logger
27
26
from kombu.mixins import ConsumerMixin
28
 
from kombu.utils.debug import setup_logging
 
27
import logging
 
28
import logstash
29
29
import os
30
30
import socket
31
31
import subprocess
44
44
)
45
45
API_VERSION = "v1"
46
46
 
47
 
logger = get_logger(__name__)
 
47
 
 
48
logger = logging.getLogger(__name__)
48
49
 
49
50
 
50
51
class AdtNovaWorker(ConsumerMixin):
79
80
        Run requested tests and posts results to the 'adt_results' queue
80
81
        for later checking.
81
82
        """
82
 
        logger.info('Got: {}'.format(body))
83
83
        body['worker'] = self.worker_name
 
84
        logger.info('Received message request: {}'.format(body), extra=body)
84
85
 
85
86
        # Run requested tests safely.
86
87
        tarball_path = None
97
98
                    'nova', 'extra_args').split()
98
99
                adt_kwargs['nova_flavor'] = get_default_testbed_flavor()
99
100
                arguments = _make_adt_argument_list(adt_kwargs)
100
 
 
 
101
                logger.info(
 
102
                    'Running adt-run with: {}'.format(
 
103
                        ' '.join(arguments)), extra=body)
101
104
                body['exit_code'] = run_adt(arguments)
 
105
                logger.info(
 
106
                    'Exit code: {}'.format(body['exit_code']), extra=body)
102
107
                _create_run_metadata_file(result_dir, body)
103
108
                tarball_path = _create_tarball_from_directory(
104
109
                    result_dir,
110
115
                upload_tarball_to_swift_container(tarball_path, container_name)
111
116
        except Exception as e:
112
117
            # Unexpected failures ...
113
 
            logger.error(e, exc_info=True)
114
118
            body['exit_code'] = '100'
 
119
            logger.error(e, exc_info=True, extra=body)
115
120
        finally:
116
 
            # The post results
117
 
            # TODO: send error logging to result-checker in the message
118
 
            queue = self.connection.SimpleQueue('adt.results.{}'.format(API_VERSION))
 
121
            logger.info('Posting results: {}'.format(body), extra=body)
 
122
            # TODO: send error logging to result-checker in the message.
 
123
            queue = self.connection.SimpleQueue(
 
124
                'adt.results.{}'.format(API_VERSION))
119
125
            queue.put(body)
120
126
            queue.close()
121
127
            if tarball_path and os.path.exists(tarball_path):
122
128
                os.remove(tarball_path)
123
129
            message.ack()
124
 
            logger.info('Done!')
 
130
            logger.info('Done!', extra=body)
125
131
 
126
132
 
127
133
def _create_run_metadata_file(directory, request):
174
180
 
175
181
    Returns the exit code from adt-run.
176
182
    """
177
 
    # TODO: We probably want something more clever here:
178
 
    try:
179
 
        subprocess.check_call(['adt-run'] + arguments)
180
 
    except subprocess.CalledProcessError as e:
181
 
        # log?
182
 
        # TODO: filter log content to avoid leaking cloud credentials.
183
 
        return e.returncode
184
 
    return 0
 
183
    cmd_line = ['adt-run'] + arguments
 
184
    proc = subprocess.Popen(
 
185
        cmd_line, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
 
186
    proc.communicate()
 
187
    return proc.returncode
185
188
 
186
189
 
187
190
def _make_adt_argument_list(request_configuration):
250
253
    return tarball_path
251
254
 
252
255
 
 
256
def configure_logging(config):
 
257
    root_logger = logging.getLogger()
 
258
    root_logger.setLevel(logging.INFO)
 
259
 
 
260
    # Silence requests logging, which is created by nova, keystone and swift.
 
261
    requests_logger = logging.getLogger('requests')
 
262
    requests_logger.setLevel(logging.ERROR)
 
263
 
 
264
    # If there is no ./logs directory, fallback to stderr.
 
265
    log_path = os.path.abspath(os.path.join(__file__, '../../logs/app.log'))
 
266
    log_dir = os.path.dirname(log_path)
 
267
    if os.path.exists(log_dir):
 
268
        handler = logging.FileHandler(log_path)
 
269
    else:
 
270
        print("'logs' directory '{}' does not exist, using stderr "
 
271
              "for app log.".format(log_dir))
 
272
        handler = logging.StreamHandler()
 
273
 
 
274
    handler.setFormatter(
 
275
        logging.Formatter(
 
276
            '%(asctime)s  %(name)s %(levelname)s: %(message)s'
 
277
        )
 
278
    )
 
279
    root_logger.addHandler(handler)
 
280
 
 
281
    if 'logstash' in config:
 
282
        root_logger.addHandler(
 
283
            logstash.LogstashHandler(
 
284
                config['logstash']['host'],
 
285
                int(config['logstash']['port']),
 
286
                int(config['logstash']['version'])
 
287
            )
 
288
        )
 
289
 
 
290
 
253
291
def main():
254
 
    setup_logging(loglevel='DEBUG', loggers=[''])
255
 
 
256
292
    parser = argparse.ArgumentParser(
257
293
        description='ADT cloud worker ...')
258
294
    parser.add_argument('-c', '--conf', default='adt-service.conf',
263
299
    config = configparser.ConfigParser()
264
300
    config.read(args.conf)
265
301
 
 
302
    configure_logging(config)
 
303
 
266
304
    set_config_dict(config)
267
305
 
268
306
    amqp_uris = config.get('amqp', 'uris').split()