3
from __future__ import print_function
5
from argparse import ArgumentParser
6
from contextlib import contextmanager
13
from utility import configure_logging
19
log = logging.getLogger("concurrently")
22
def task_definition(name_commandline):
23
name, commandline = name_commandline.split('=', 1)
24
command = shlex.split(commandline)
30
def __init__(self, name, command, log_dir='.'):
32
self.command = command
33
self.out_log_name = os.path.join(
34
log_dir, '{}-out.log'.format(self.name))
35
self.err_log_name = os.path.join(
36
log_dir, '{}-err.log'.format(self.name))
37
self.returncode = None
41
def from_arg(cls, name_commandline, log_dir='.'):
42
return cls(*task_definition(name_commandline), log_dir=log_dir)
44
def __eq__(self, other):
45
if type(self) != type(other):
47
return (self.name == other.name and
48
self.command == other.command and
49
self.out_log_name == other.out_log_name and
50
self.err_log_name == other.err_log_name)
54
"""Yield the running proc, then wait to set the returncode."""
55
with open(self.out_log_name, 'ab') as out_log:
56
with open(self.err_log_name, 'ab') as err_log:
57
self.proc = subprocess.Popen(
58
self.command, stdout=out_log, stderr=err_log)
59
log.debug('Started {}'.format(self.name))
63
log.debug('Waiting for {} to finish'.format(self.name))
64
self.returncode = self.proc.wait()
65
log.debug('{} finished'.format(self.name))
69
"""Run all tasks in the list.
71
The list is a queue that will be emptied.
82
def summarise_tasks(tasks):
83
"""Log summary of results and returns the number of tasks that failed."""
84
failed_count = sum(t.returncode != 0 for t in tasks)
90
if task.returncode != 0:
91
log.error('{} failed with {}\nSee {}'.format(
92
task.name, task.returncode, task.err_log_name))
96
def parse_args(argv=None):
97
"""Return the parsed args for this program."""
98
parser = ArgumentParser(
99
description="Run many tasks concurrently.")
101
'-v', '--verbose', action='store_const',
102
default=logging.INFO, const=logging.DEBUG,
103
help='Increase verbosity.')
105
'-l', '--log_dir', default='.', type=os.path.expanduser,
106
help='The path to store the logs for each task.')
108
'tasks', nargs='+', default=[], type=task_definition,
109
help="one or more tasks to run in the form of name='cmc -opt arg'.")
110
return parser.parse_args(argv)
114
"""Run many tasks concurrently."""
115
args = parse_args(argv)
116
configure_logging(args.verbose)
117
tasks = [Task(*t, log_dir=args.log_dir) for t in args.tasks]
119
names = [t.name for t in tasks]
120
log.debug('Running these tasks {}'.format(names))
123
log.exception("Script failed while running tasks")
125
return min(100, summarise_tasks(tasks))
128
if __name__ == '__main__':