~andrewjbeach/juju-ci-tools/make-local-patcher

« back to all changes in this revision

Viewing changes to tests/test_concurrently.py

Handle LoggedException in quickstart_deploy, assess_bootstrap.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import logging
2
 
from mock import (
3
 
    Mock,
4
 
    patch,
5
 
    )
6
 
import os
7
 
 
8
 
import concurrently
9
 
from tests import (
10
 
    parse_error,
11
 
    TestCase,
12
 
)
13
 
from utility import temp_dir
14
 
 
15
 
 
16
 
class ConcurrentlyTest(TestCase):
17
 
 
18
 
    log_level = logging.ERROR
19
 
 
20
 
    def test_main(self):
21
 
        with patch('concurrently.run_all', autospec=True) as r_mock:
22
 
            with patch('concurrently.summarise_tasks', autospec=True,
23
 
                       return_value=0) as s_mock:
24
 
                returncode = concurrently.main(
25
 
                    ['-v', '-l', '.', 'one=foo a b', 'two=bar c'])
26
 
        self.assertEqual(0, returncode)
27
 
        task_one = concurrently.Task.from_arg('one=foo a b')
28
 
        task_two = concurrently.Task.from_arg('two=bar c')
29
 
        r_mock.assert_called_once_with([task_one, task_two])
30
 
        s_mock.assert_called_once_with([task_one, task_two])
31
 
 
32
 
    def test_main_error(self):
33
 
        with patch('concurrently.run_all', side_effect=ValueError('bad')):
34
 
            returncode = concurrently.main(['-v', 'one=foo a b', 'two=bar c'])
35
 
        self.assertEqual(126, returncode)
36
 
        self.assertIn('ERROR Script failed', self.log_stream.getvalue())
37
 
        self.assertIn('ValueError: bad', self.log_stream.getvalue())
38
 
 
39
 
    def test_bad_task_missing_name(self):
40
 
        with parse_error(self) as err_stream:
41
 
            concurrently.main(['-v', 'bad'])
42
 
        self.assertIn(
43
 
            "invalid task_definition value: 'bad'", err_stream.getvalue())
44
 
        self.assertEqual('', self.log_stream.getvalue())
45
 
 
46
 
    def test_bad_task_bad_lex(self):
47
 
        with parse_error(self) as err_stream:
48
 
            concurrently.main(['-v', 'wrong="command'])
49
 
        self.assertIn(
50
 
            """invalid task_definition value: 'wrong="command'""",
51
 
            err_stream.getvalue())
52
 
        self.assertEqual('', self.log_stream.getvalue())
53
 
 
54
 
    def test_max_failure_returncode(self):
55
 
        """With many tasks the return code is clamped to under 127."""
56
 
        definitions = ["t{}=job".format(i) for i in range(101)]
57
 
        with patch('concurrently.run_all') as r_mock:
58
 
            with patch('concurrently.summarise_tasks',
59
 
                       return_value=101) as s_mock:
60
 
                returncode = concurrently.main(['-v', '-l', '.'] + definitions)
61
 
        self.assertEqual(100, returncode)
62
 
        tasks = map(concurrently.Task.from_arg, definitions)
63
 
        r_mock.assert_called_once_with(tasks)
64
 
        s_mock.assert_called_once_with(tasks)
65
 
 
66
 
    def test_parse_args(self):
67
 
        args = concurrently.parse_args(
68
 
            ['-v', '-l', '~/logs', 'one=foo a b', 'two=bar c'])
69
 
        self.assertEqual(logging.DEBUG, args.verbose)
70
 
        self.assertEqual(os.path.expanduser('~/logs'), args.log_dir)
71
 
        self.assertEqual(
72
 
            [('one', ['foo', 'a', 'b']), ('two', ['bar', 'c'])], args.tasks)
73
 
 
74
 
    def test_summarise_tasks(self):
75
 
        task_one = concurrently.Task.from_arg('one=foo a')
76
 
        task_one.returncode = 0
77
 
        task_two = concurrently.Task.from_arg('two=bar b')
78
 
        task_two.returncode = 0
79
 
        tasks = [task_one, task_two]
80
 
        self.assertEqual(0, concurrently.summarise_tasks(tasks))
81
 
        task_one.returncode = 1
82
 
        self.assertEqual(1, concurrently.summarise_tasks(tasks))
83
 
        task_two.returncode = 3
84
 
        self.assertEqual(2, concurrently.summarise_tasks(tasks))
85
 
 
86
 
    def test_summarise_is_not_summing(self):
87
 
        """Exit codes must not be 0 for failed tasks when truncated to char."""
88
 
        task_one = concurrently.Task.from_arg('one=foo a')
89
 
        task_one.returncode = 255
90
 
        task_two = concurrently.Task.from_arg('two=bar b')
91
 
        task_two.returncode = 1
92
 
        tasks = [task_one, task_two]
93
 
        self.assertNotEqual(0, concurrently.summarise_tasks(tasks) & 255)
94
 
 
95
 
    def test_run_all(self):
96
 
        task_one = concurrently.Task.from_arg('one=foo a')
97
 
        task_two = concurrently.Task.from_arg('two=bar b')
98
 
        mutable_tasks = [task_one, task_two]
99
 
        with patch.object(task_one, 'finish', autospec=True) as f1_mock:
100
 
            with patch.object(task_one, 'start', autospec=True) as s1_mock:
101
 
                with patch.object(task_two, 'finish',
102
 
                                  autospec=True,) as f2_mock:
103
 
                    with patch.object(task_two, 'start',
104
 
                                      autospec=True) as s2_mock:
105
 
                        concurrently.run_all(mutable_tasks)
106
 
        s1_mock.assert_called_once_with()
107
 
        f1_mock.assert_called_once_with()
108
 
        s2_mock.assert_called_once_with()
109
 
        f2_mock.assert_called_once_with()
110
 
        self.assertEqual([], mutable_tasks)
111
 
 
112
 
 
113
 
class TaskTest(TestCase):
114
 
 
115
 
    def test_init(self):
116
 
        with temp_dir() as base:
117
 
            task = concurrently.Task.from_arg('one=foo a b c', log_dir=base)
118
 
            self.assertEqual('one', task.name)
119
 
            self.assertEqual(['foo', 'a', 'b', 'c'], task.command)
120
 
            self.assertEqual(
121
 
                os.path.join(base, 'one-out.log'), task.out_log_name)
122
 
            self.assertEqual(
123
 
                os.path.join(base, 'one-err.log'), task.err_log_name)
124
 
            self.assertIsNone(task.returncode)
125
 
            self.assertIsNone(task.proc)
126
 
 
127
 
    def test_init_quoted_args(self):
128
 
        with temp_dir() as base:
129
 
            task = concurrently.Task.from_arg('one=foo a "b c"', log_dir=base)
130
 
            self.assertEqual('one', task.name)
131
 
            self.assertEqual(['foo', 'a', 'b c'], task.command)
132
 
 
133
 
    def test_start(self):
134
 
        with temp_dir() as base:
135
 
            task = concurrently.Task.from_arg('one=foo a', log_dir=base)
136
 
            with patch('subprocess.Popen',
137
 
                       autospec=True, return_value='proc') as p_mock:
138
 
                with task.start() as proc:
139
 
                    self.assertEqual('proc', proc)
140
 
                    self.assertEqual('proc', task.proc)
141
 
                    self.assertEqual(1, p_mock.call_count)
142
 
                    args, kwargs = p_mock.call_args
143
 
                    self.assertEqual((['foo', 'a'], ), args)
144
 
                    kwargs['stdout'].write('out')
145
 
                    kwargs['stderr'].write('err')
146
 
            self.assertIs(
147
 
                True,
148
 
                os.path.exists(os.path.join(base, 'one-out.log')))
149
 
            self.assertIs(
150
 
                True,
151
 
                os.path.exists(os.path.join(base, 'one-out.log')))
152
 
 
153
 
    def test_finish(self):
154
 
        task = concurrently.Task.from_arg('one=foo a')
155
 
        task.proc = Mock(spec=['wait'])
156
 
        task.finish()
157
 
        task.proc.wait.assert_called_once_with()