13
from utility import temp_dir
16
class ConcurrentlyTest(TestCase):
18
log_level = logging.ERROR
21
with patch('concurrently.run_all', autospec=True) as r_mock:
22
with patch('concurrently.summarise_tasks', autospec=True,
23
return_value=0) as s_mock:
24
returncode = concurrently.main(
25
['-v', '-l', '.', 'one=foo a b', 'two=bar c'])
26
self.assertEqual(0, returncode)
27
task_one = concurrently.Task.from_arg('one=foo a b')
28
task_two = concurrently.Task.from_arg('two=bar c')
29
r_mock.assert_called_once_with([task_one, task_two])
30
s_mock.assert_called_once_with([task_one, task_two])
32
def test_main_error(self):
33
with patch('concurrently.run_all', side_effect=ValueError('bad')):
34
returncode = concurrently.main(['-v', 'one=foo a b', 'two=bar c'])
35
self.assertEqual(126, returncode)
36
self.assertIn('ERROR Script failed', self.log_stream.getvalue())
37
self.assertIn('ValueError: bad', self.log_stream.getvalue())
39
def test_bad_task_missing_name(self):
40
with parse_error(self) as err_stream:
41
concurrently.main(['-v', 'bad'])
43
"invalid task_definition value: 'bad'", err_stream.getvalue())
44
self.assertEqual('', self.log_stream.getvalue())
46
def test_bad_task_bad_lex(self):
47
with parse_error(self) as err_stream:
48
concurrently.main(['-v', 'wrong="command'])
50
"""invalid task_definition value: 'wrong="command'""",
51
err_stream.getvalue())
52
self.assertEqual('', self.log_stream.getvalue())
54
def test_max_failure_returncode(self):
55
"""With many tasks the return code is clamped to under 127."""
56
definitions = ["t{}=job".format(i) for i in range(101)]
57
with patch('concurrently.run_all') as r_mock:
58
with patch('concurrently.summarise_tasks',
59
return_value=101) as s_mock:
60
returncode = concurrently.main(['-v', '-l', '.'] + definitions)
61
self.assertEqual(100, returncode)
62
tasks = map(concurrently.Task.from_arg, definitions)
63
r_mock.assert_called_once_with(tasks)
64
s_mock.assert_called_once_with(tasks)
66
def test_parse_args(self):
67
args = concurrently.parse_args(
68
['-v', '-l', '~/logs', 'one=foo a b', 'two=bar c'])
69
self.assertEqual(logging.DEBUG, args.verbose)
70
self.assertEqual(os.path.expanduser('~/logs'), args.log_dir)
72
[('one', ['foo', 'a', 'b']), ('two', ['bar', 'c'])], args.tasks)
74
def test_summarise_tasks(self):
75
task_one = concurrently.Task.from_arg('one=foo a')
76
task_one.returncode = 0
77
task_two = concurrently.Task.from_arg('two=bar b')
78
task_two.returncode = 0
79
tasks = [task_one, task_two]
80
self.assertEqual(0, concurrently.summarise_tasks(tasks))
81
task_one.returncode = 1
82
self.assertEqual(1, concurrently.summarise_tasks(tasks))
83
task_two.returncode = 3
84
self.assertEqual(2, concurrently.summarise_tasks(tasks))
86
def test_summarise_is_not_summing(self):
87
"""Exit codes must not be 0 for failed tasks when truncated to char."""
88
task_one = concurrently.Task.from_arg('one=foo a')
89
task_one.returncode = 255
90
task_two = concurrently.Task.from_arg('two=bar b')
91
task_two.returncode = 1
92
tasks = [task_one, task_two]
93
self.assertNotEqual(0, concurrently.summarise_tasks(tasks) & 255)
95
def test_run_all(self):
96
task_one = concurrently.Task.from_arg('one=foo a')
97
task_two = concurrently.Task.from_arg('two=bar b')
98
mutable_tasks = [task_one, task_two]
99
with patch.object(task_one, 'finish', autospec=True) as f1_mock:
100
with patch.object(task_one, 'start', autospec=True) as s1_mock:
101
with patch.object(task_two, 'finish',
102
autospec=True,) as f2_mock:
103
with patch.object(task_two, 'start',
104
autospec=True) as s2_mock:
105
concurrently.run_all(mutable_tasks)
106
s1_mock.assert_called_once_with()
107
f1_mock.assert_called_once_with()
108
s2_mock.assert_called_once_with()
109
f2_mock.assert_called_once_with()
110
self.assertEqual([], mutable_tasks)
113
class TaskTest(TestCase):
116
with temp_dir() as base:
117
task = concurrently.Task.from_arg('one=foo a b c', log_dir=base)
118
self.assertEqual('one', task.name)
119
self.assertEqual(['foo', 'a', 'b', 'c'], task.command)
121
os.path.join(base, 'one-out.log'), task.out_log_name)
123
os.path.join(base, 'one-err.log'), task.err_log_name)
124
self.assertIsNone(task.returncode)
125
self.assertIsNone(task.proc)
127
def test_init_quoted_args(self):
128
with temp_dir() as base:
129
task = concurrently.Task.from_arg('one=foo a "b c"', log_dir=base)
130
self.assertEqual('one', task.name)
131
self.assertEqual(['foo', 'a', 'b c'], task.command)
133
def test_start(self):
134
with temp_dir() as base:
135
task = concurrently.Task.from_arg('one=foo a', log_dir=base)
136
with patch('subprocess.Popen',
137
autospec=True, return_value='proc') as p_mock:
138
with task.start() as proc:
139
self.assertEqual('proc', proc)
140
self.assertEqual('proc', task.proc)
141
self.assertEqual(1, p_mock.call_count)
142
args, kwargs = p_mock.call_args
143
self.assertEqual((['foo', 'a'], ), args)
144
kwargs['stdout'].write('out')
145
kwargs['stderr'].write('err')
148
os.path.exists(os.path.join(base, 'one-out.log')))
151
os.path.exists(os.path.join(base, 'one-out.log')))
153
def test_finish(self):
154
task = concurrently.Task.from_arg('one=foo a')
155
task.proc = Mock(spec=['wait'])
157
task.proc.wait.assert_called_once_with()