~andrewjbeach/juju-ci-tools/make-local-patcher

« back to all changes in this revision

Viewing changes to concurrently.py

  • Committer: Curtis Hovey
  • Date: 2016-09-20 01:59:47 UTC
  • mto: This revision was merged to the branch mainline in revision 1602.
  • Revision ID: curtis@canonical.com-20160920015947-ko27xkj3a4i774h6
Convert juju instance=ids to true azuzre ids.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/python
 
2
 
 
3
from __future__ import print_function
 
4
 
 
5
from argparse import ArgumentParser
 
6
from contextlib import contextmanager
 
7
import logging
 
8
import os
 
9
import subprocess
 
10
import shlex
 
11
import sys
 
12
 
 
13
from utility import configure_logging
 
14
 
 
15
 
 
16
__metaclass__ = type
 
17
 
 
18
 
 
19
log = logging.getLogger("concurrently")
 
20
 
 
21
 
 
22
def task_definition(name_commandline):
 
23
    name, commandline = name_commandline.split('=', 1)
 
24
    command = shlex.split(commandline)
 
25
    return name, command
 
26
 
 
27
 
 
28
class Task:
 
29
 
 
30
    def __init__(self, name, command, log_dir='.'):
 
31
        self.name = name
 
32
        self.command = command
 
33
        self.out_log_name = os.path.join(
 
34
            log_dir, '{}-out.log'.format(self.name))
 
35
        self.err_log_name = os.path.join(
 
36
            log_dir, '{}-err.log'.format(self.name))
 
37
        self.returncode = None
 
38
        self.proc = None
 
39
 
 
40
    @classmethod
 
41
    def from_arg(cls, name_commandline, log_dir='.'):
 
42
        return cls(*task_definition(name_commandline), log_dir=log_dir)
 
43
 
 
44
    def __eq__(self, other):
 
45
        if type(self) != type(other):
 
46
            return False
 
47
        return (self.name == other.name and
 
48
                self.command == other.command and
 
49
                self.out_log_name == other.out_log_name and
 
50
                self.err_log_name == other.err_log_name)
 
51
 
 
52
    @contextmanager
 
53
    def start(self):
 
54
        """Yield the running proc, then wait to set the returncode."""
 
55
        with open(self.out_log_name, 'ab') as out_log:
 
56
            with open(self.err_log_name, 'ab') as err_log:
 
57
                self.proc = subprocess.Popen(
 
58
                    self.command, stdout=out_log, stderr=err_log)
 
59
                log.debug('Started {}'.format(self.name))
 
60
                yield self.proc
 
61
 
 
62
    def finish(self):
 
63
        log.debug('Waiting for {} to finish'.format(self.name))
 
64
        self.returncode = self.proc.wait()
 
65
        log.debug('{} finished'.format(self.name))
 
66
 
 
67
 
 
68
def run_all(tasks):
 
69
    """Run all tasks in the list.
 
70
 
 
71
    The list is a queue that will be emptied.
 
72
    """
 
73
    try:
 
74
        task = tasks.pop()
 
75
    except IndexError:
 
76
        return
 
77
    with task.start():
 
78
        run_all(tasks)
 
79
        task.finish()
 
80
 
 
81
 
 
82
def summarise_tasks(tasks):
 
83
    """Log summary of results and returns the number of tasks that failed."""
 
84
    failed_count = sum(t.returncode != 0 for t in tasks)
 
85
    if not failed_count:
 
86
        log.debug('SUCCESS')
 
87
    else:
 
88
        log.debug('FAIL')
 
89
        for task in tasks:
 
90
            if task.returncode != 0:
 
91
                log.error('{} failed with {}\nSee {}'.format(
 
92
                          task.name, task.returncode, task.err_log_name))
 
93
    return failed_count
 
94
 
 
95
 
 
96
def parse_args(argv=None):
 
97
    """Return the parsed args for this program."""
 
98
    parser = ArgumentParser(
 
99
        description="Run many tasks concurrently.")
 
100
    parser.add_argument(
 
101
        '-v', '--verbose', action='store_const',
 
102
        default=logging.INFO, const=logging.DEBUG,
 
103
        help='Increase verbosity.')
 
104
    parser.add_argument(
 
105
        '-l', '--log_dir', default='.', type=os.path.expanduser,
 
106
        help='The path to store the logs for each task.')
 
107
    parser.add_argument(
 
108
        'tasks', nargs='+', default=[], type=task_definition,
 
109
        help="one or more tasks to run in the form of name='cmc -opt arg'.")
 
110
    return parser.parse_args(argv)
 
111
 
 
112
 
 
113
def main(argv=None):
 
114
    """Run many tasks concurrently."""
 
115
    args = parse_args(argv)
 
116
    configure_logging(args.verbose)
 
117
    tasks = [Task(*t, log_dir=args.log_dir) for t in args.tasks]
 
118
    try:
 
119
        names = [t.name for t in tasks]
 
120
        log.debug('Running these tasks {}'.format(names))
 
121
        run_all(list(tasks))
 
122
    except Exception:
 
123
        log.exception("Script failed while running tasks")
 
124
        return 126
 
125
    return min(100, summarise_tasks(tasks))
 
126
 
 
127
 
 
128
if __name__ == '__main__':
 
129
    sys.exit(main())