~smoser/ubuntu/vivid/cloud-init/snappy

« back to all changes in this revision

Viewing changes to tests/unittests/test_handler/test_handler_ca_certs.py

  • Committer: Scott Moser
  • Date: 2015-02-27 20:55:58 UTC
  • mfrom: (355.2.8 vivid)
  • Revision ID: smoser@ubuntu.com-20150227205558-glrwdgxqkaz6zyxa
* Merge with vivid at 0.7.7~bzr1067-0ubuntu1
* New upstream snapshot.
  * fix broken consumption of gzipped user-data (LP: #1424900)
  * functional user-data on Azure again (LP: #1423972)
  * CloudStack: support fetching password from virtual router (LP: #1422388)
* New upstream snapshot.
  * Fix for ascii decode in DataSourceAzure (LP: #1422993).
* New upstream snapshot.
  * support for gpt partitioning, utilized in Azure [Daniel Watkins]
  * fix bug in exception handling in mount_cb.
* New upstream snapshot.
  * move to python3 (LP: #1247132)
  * systemd: run cloud-init before systemd-user-sessions.service
  * Use the GCE short hostname. (LP: #1383794)
  * Enable user-data encoding support for GCE. (LP: #1404311)
  * Update to use a newer and better OMNIBUS_URL
  * Be more tolerant of 'ssh_authorized_keys' types
  * Fix parse_ssh_config failing in ssh_util.py
  * Increase the robustness/configurability of the chef module.
  * retain trailing newline from template files when using
    jinja2 (LP: #1355343)
  * fix broken output handling (LP: #1387340)
  * digital ocean datasource
  * update url in config drive documentation
  * freebsd: enable correct behavior on Ec2.
  * freebsd: Use the proper virtio FreeBSD network interface name.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from mocker import MockerTestCase
2
 
 
3
1
from cloudinit import cloud
4
2
from cloudinit import helpers
5
3
from cloudinit import util
6
4
 
7
5
from cloudinit.config import cc_ca_certs
 
6
from ..helpers import TestCase
8
7
 
9
8
import logging
10
 
 
11
 
 
12
 
class TestNoConfig(MockerTestCase):
 
9
import shutil
 
10
import tempfile
 
11
import unittest
 
12
 
 
13
try:
 
14
    from unittest import mock
 
15
except ImportError:
 
16
    import mock
 
17
try:
 
18
    from contextlib import ExitStack
 
19
except ImportError:
 
20
    from contextlib2 import ExitStack
 
21
 
 
22
 
 
23
class TestNoConfig(unittest.TestCase):
13
24
    def setUp(self):
14
25
        super(TestNoConfig, self).setUp()
15
26
        self.name = "ca-certs"
22
33
        Test that nothing is done if no ca-certs configuration is provided.
23
34
        """
24
35
        config = util.get_builtin_cfg()
25
 
        self.mocker.replace(util.write_file, passthrough=False)
26
 
        self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False)
27
 
        self.mocker.replay()
28
 
 
29
 
        cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
30
 
                           self.args)
31
 
 
32
 
 
33
 
class TestConfig(MockerTestCase):
 
36
        with ExitStack() as mocks:
 
37
            util_mock = mocks.enter_context(
 
38
                mock.patch.object(util, 'write_file'))
 
39
            certs_mock = mocks.enter_context(
 
40
                mock.patch.object(cc_ca_certs, 'update_ca_certs'))
 
41
 
 
42
            cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
 
43
                               self.args)
 
44
 
 
45
            self.assertEqual(util_mock.call_count, 0)
 
46
            self.assertEqual(certs_mock.call_count, 0)
 
47
 
 
48
 
 
49
class TestConfig(TestCase):
34
50
    def setUp(self):
35
51
        super(TestConfig, self).setUp()
36
52
        self.name = "ca-certs"
39
55
        self.log = logging.getLogger("TestNoConfig")
40
56
        self.args = []
41
57
 
 
58
        self.mocks = ExitStack()
 
59
        self.addCleanup(self.mocks.close)
 
60
 
42
61
        # Mock out the functions that actually modify the system
43
 
        self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs,
44
 
                                            passthrough=False)
45
 
        self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs,
46
 
                                               passthrough=False)
47
 
        self.mock_remove = self.mocker.replace(
48
 
            cc_ca_certs.remove_default_ca_certs, passthrough=False)
49
 
 
50
 
        # Order must be correct
51
 
        self.mocker.order()
 
62
        self.mock_add = self.mocks.enter_context(
 
63
            mock.patch.object(cc_ca_certs, 'add_ca_certs'))
 
64
        self.mock_update = self.mocks.enter_context(
 
65
            mock.patch.object(cc_ca_certs, 'update_ca_certs'))
 
66
        self.mock_remove = self.mocks.enter_context(
 
67
            mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
52
68
 
53
69
    def test_no_trusted_list(self):
54
70
        """
57
73
        """
58
74
        config = {"ca-certs": {}}
59
75
 
60
 
        # No functions should be called
61
 
        self.mock_update()
62
 
        self.mocker.replay()
63
 
 
64
76
        cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
65
77
 
 
78
        self.assertEqual(self.mock_add.call_count, 0)
 
79
        self.assertEqual(self.mock_update.call_count, 1)
 
80
        self.assertEqual(self.mock_remove.call_count, 0)
 
81
 
66
82
    def test_empty_trusted_list(self):
67
83
        """Test that no certificate are written if 'trusted' list is empty."""
68
84
        config = {"ca-certs": {"trusted": []}}
69
85
 
70
 
        # No functions should be called
71
 
        self.mock_update()
72
 
        self.mocker.replay()
73
 
 
74
86
        cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
75
87
 
 
88
        self.assertEqual(self.mock_add.call_count, 0)
 
89
        self.assertEqual(self.mock_update.call_count, 1)
 
90
        self.assertEqual(self.mock_remove.call_count, 0)
 
91
 
76
92
    def test_single_trusted(self):
77
93
        """Test that a single cert gets passed to add_ca_certs."""
78
94
        config = {"ca-certs": {"trusted": ["CERT1"]}}
79
95
 
80
 
        self.mock_add(["CERT1"])
81
 
        self.mock_update()
82
 
        self.mocker.replay()
83
 
 
84
96
        cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
85
97
 
 
98
        self.mock_add.assert_called_once_with(['CERT1'])
 
99
        self.assertEqual(self.mock_update.call_count, 1)
 
100
        self.assertEqual(self.mock_remove.call_count, 0)
 
101
 
86
102
    def test_multiple_trusted(self):
87
103
        """Test that multiple certs get passed to add_ca_certs."""
88
104
        config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
89
105
 
90
 
        self.mock_add(["CERT1", "CERT2"])
91
 
        self.mock_update()
92
 
        self.mocker.replay()
93
 
 
94
106
        cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
95
107
 
 
108
        self.mock_add.assert_called_once_with(['CERT1', 'CERT2'])
 
109
        self.assertEqual(self.mock_update.call_count, 1)
 
110
        self.assertEqual(self.mock_remove.call_count, 0)
 
111
 
96
112
    def test_remove_default_ca_certs(self):
97
113
        """Test remove_defaults works as expected."""
98
114
        config = {"ca-certs": {"remove-defaults": True}}
99
115
 
100
 
        self.mock_remove()
101
 
        self.mock_update()
102
 
        self.mocker.replay()
103
 
 
104
116
        cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
105
117
 
 
118
        self.assertEqual(self.mock_add.call_count, 0)
 
119
        self.assertEqual(self.mock_update.call_count, 1)
 
120
        self.assertEqual(self.mock_remove.call_count, 1)
 
121
 
106
122
    def test_no_remove_defaults_if_false(self):
107
123
        """Test remove_defaults is not called when config value is False."""
108
124
        config = {"ca-certs": {"remove-defaults": False}}
109
125
 
110
 
        self.mock_update()
111
 
        self.mocker.replay()
112
 
 
113
126
        cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
114
127
 
 
128
        self.assertEqual(self.mock_add.call_count, 0)
 
129
        self.assertEqual(self.mock_update.call_count, 1)
 
130
        self.assertEqual(self.mock_remove.call_count, 0)
 
131
 
115
132
    def test_correct_order_for_remove_then_add(self):
116
133
        """Test remove_defaults is not called when config value is False."""
117
134
        config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
118
135
 
119
 
        self.mock_remove()
120
 
        self.mock_add(["CERT1"])
121
 
        self.mock_update()
122
 
        self.mocker.replay()
123
 
 
124
136
        cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
125
137
 
126
 
 
127
 
class TestAddCaCerts(MockerTestCase):
 
138
        self.mock_add.assert_called_once_with(['CERT1'])
 
139
        self.assertEqual(self.mock_update.call_count, 1)
 
140
        self.assertEqual(self.mock_remove.call_count, 1)
 
141
 
 
142
 
 
143
class TestAddCaCerts(TestCase):
128
144
 
129
145
    def setUp(self):
130
146
        super(TestAddCaCerts, self).setUp()
 
147
        tmpdir = tempfile.mkdtemp()
 
148
        self.addCleanup(shutil.rmtree, tmpdir)
131
149
        self.paths = helpers.Paths({
132
 
            'cloud_dir': self.makeDir()
 
150
            'cloud_dir': tmpdir,
133
151
        })
134
152
 
135
153
    def test_no_certs_in_list(self):
136
154
        """Test that no certificate are written if not provided."""
137
 
        self.mocker.replace(util.write_file, passthrough=False)
138
 
        self.mocker.replay()
139
 
        cc_ca_certs.add_ca_certs([])
 
155
        with mock.patch.object(util, 'write_file') as mockobj:
 
156
            cc_ca_certs.add_ca_certs([])
 
157
        self.assertEqual(mockobj.call_count, 0)
140
158
 
141
159
    def test_single_cert_trailing_cr(self):
142
160
        """Test adding a single certificate to the trusted CAs
146
164
        ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
147
165
        expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
148
166
 
149
 
        mock_write = self.mocker.replace(util.write_file, passthrough=False)
150
 
        mock_load = self.mocker.replace(util.load_file, passthrough=False)
151
 
 
152
 
        mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
153
 
                   cert, mode=0644)
154
 
 
155
 
        mock_load("/etc/ca-certificates.conf")
156
 
        self.mocker.result(ca_certs_content)
157
 
 
158
 
        mock_write("/etc/ca-certificates.conf", expected, omode="wb")
159
 
        self.mocker.replay()
160
 
 
161
 
        cc_ca_certs.add_ca_certs([cert])
 
167
        with ExitStack() as mocks:
 
168
            mock_write = mocks.enter_context(
 
169
                mock.patch.object(util, 'write_file'))
 
170
            mock_load = mocks.enter_context(
 
171
                mock.patch.object(util, 'load_file',
 
172
                                  return_value=ca_certs_content))
 
173
 
 
174
            cc_ca_certs.add_ca_certs([cert])
 
175
 
 
176
            mock_write.assert_has_calls([
 
177
                mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
 
178
                          cert, mode=0o644),
 
179
                mock.call("/etc/ca-certificates.conf", expected, omode="wb"),
 
180
                ])
 
181
            mock_load.assert_called_once_with("/etc/ca-certificates.conf")
162
182
 
163
183
    def test_single_cert_no_trailing_cr(self):
164
184
        """Test adding a single certificate to the trusted CAs
167
187
 
168
188
        ca_certs_content = "line1\nline2\nline3"
169
189
 
170
 
        mock_write = self.mocker.replace(util.write_file, passthrough=False)
171
 
        mock_load = self.mocker.replace(util.load_file, passthrough=False)
172
 
 
173
 
        mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
174
 
                   cert, mode=0644)
175
 
 
176
 
        mock_load("/etc/ca-certificates.conf")
177
 
        self.mocker.result(ca_certs_content)
178
 
 
179
 
        mock_write("/etc/ca-certificates.conf",
180
 
                   "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"),
181
 
                   omode="wb")
182
 
        self.mocker.replay()
183
 
 
184
 
        cc_ca_certs.add_ca_certs([cert])
 
190
        with ExitStack() as mocks:
 
191
            mock_write = mocks.enter_context(
 
192
                mock.patch.object(util, 'write_file'))
 
193
            mock_load = mocks.enter_context(
 
194
                mock.patch.object(util, 'load_file',
 
195
                                  return_value=ca_certs_content))
 
196
 
 
197
            cc_ca_certs.add_ca_certs([cert])
 
198
 
 
199
            mock_write.assert_has_calls([
 
200
                mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
 
201
                          cert, mode=0o644),
 
202
                mock.call("/etc/ca-certificates.conf",
 
203
                          "%s\n%s\n" % (ca_certs_content,
 
204
                                        "cloud-init-ca-certs.crt"),
 
205
                          omode="wb"),
 
206
                ])
 
207
 
 
208
            mock_load.assert_called_once_with("/etc/ca-certificates.conf")
185
209
 
186
210
    def test_multiple_certs(self):
187
211
        """Test adding multiple certificates to the trusted CAs."""
188
212
        certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
189
213
        expected_cert_file = "\n".join(certs)
190
 
 
191
 
        mock_write = self.mocker.replace(util.write_file, passthrough=False)
192
 
        mock_load = self.mocker.replace(util.load_file, passthrough=False)
193
 
 
194
 
        mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
195
 
                   expected_cert_file, mode=0644)
196
 
 
197
214
        ca_certs_content = "line1\nline2\nline3"
198
 
        mock_load("/etc/ca-certificates.conf")
199
 
        self.mocker.result(ca_certs_content)
200
 
 
201
 
        out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt")
202
 
        mock_write("/etc/ca-certificates.conf", out, omode="wb")
203
 
 
204
 
        self.mocker.replay()
205
 
 
206
 
        cc_ca_certs.add_ca_certs(certs)
207
 
 
208
 
 
209
 
class TestUpdateCaCerts(MockerTestCase):
 
215
 
 
216
        with ExitStack() as mocks:
 
217
            mock_write = mocks.enter_context(
 
218
                mock.patch.object(util, 'write_file'))
 
219
            mock_load = mocks.enter_context(
 
220
                mock.patch.object(util, 'load_file',
 
221
                                  return_value=ca_certs_content))
 
222
 
 
223
            cc_ca_certs.add_ca_certs(certs)
 
224
 
 
225
            mock_write.assert_has_calls([
 
226
                mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
 
227
                          expected_cert_file, mode=0o644),
 
228
                mock.call("/etc/ca-certificates.conf",
 
229
                          "%s\n%s\n" % (ca_certs_content,
 
230
                                        "cloud-init-ca-certs.crt"),
 
231
                          omode='wb'),
 
232
                ])
 
233
 
 
234
            mock_load.assert_called_once_with("/etc/ca-certificates.conf")
 
235
 
 
236
 
 
237
class TestUpdateCaCerts(unittest.TestCase):
210
238
    def test_commands(self):
211
 
        mock_check_call = self.mocker.replace(util.subp,
212
 
                                              passthrough=False)
213
 
        mock_check_call(["update-ca-certificates"], capture=False)
214
 
        self.mocker.replay()
215
 
 
216
 
        cc_ca_certs.update_ca_certs()
217
 
 
218
 
 
219
 
class TestRemoveDefaultCaCerts(MockerTestCase):
 
239
        with mock.patch.object(util, 'subp') as mockobj:
 
240
            cc_ca_certs.update_ca_certs()
 
241
            mockobj.assert_called_once_with(
 
242
                ["update-ca-certificates"], capture=False)
 
243
 
 
244
 
 
245
class TestRemoveDefaultCaCerts(TestCase):
220
246
 
221
247
    def setUp(self):
222
248
        super(TestRemoveDefaultCaCerts, self).setUp()
 
249
        tmpdir = tempfile.mkdtemp()
 
250
        self.addCleanup(shutil.rmtree, tmpdir)
223
251
        self.paths = helpers.Paths({
224
 
            'cloud_dir': self.makeDir()
 
252
            'cloud_dir': tmpdir,
225
253
        })
226
254
 
227
255
    def test_commands(self):
228
 
        mock_delete_dir_contents = self.mocker.replace(
229
 
            util.delete_dir_contents, passthrough=False)
230
 
        mock_write = self.mocker.replace(util.write_file, passthrough=False)
231
 
        mock_subp = self.mocker.replace(util.subp,
232
 
                                        passthrough=False)
233
 
 
234
 
        mock_delete_dir_contents("/usr/share/ca-certificates/")
235
 
        mock_delete_dir_contents("/etc/ssl/certs/")
236
 
        mock_write("/etc/ca-certificates.conf", "", mode=0644)
237
 
        mock_subp(('debconf-set-selections', '-'),
238
 
                  "ca-certificates ca-certificates/trust_new_crts select no")
239
 
        self.mocker.replay()
240
 
 
241
 
        cc_ca_certs.remove_default_ca_certs()
 
256
        with ExitStack() as mocks:
 
257
            mock_delete = mocks.enter_context(
 
258
                mock.patch.object(util, 'delete_dir_contents'))
 
259
            mock_write = mocks.enter_context(
 
260
                mock.patch.object(util, 'write_file'))
 
261
            mock_subp = mocks.enter_context(mock.patch.object(util, 'subp'))
 
262
 
 
263
            cc_ca_certs.remove_default_ca_certs()
 
264
 
 
265
            mock_delete.assert_has_calls([
 
266
                mock.call("/usr/share/ca-certificates/"),
 
267
                mock.call("/etc/ssl/certs/"),
 
268
                ])
 
269
 
 
270
            mock_write.assert_called_once_with(
 
271
                "/etc/ca-certificates.conf", "", mode=0o644)
 
272
 
 
273
            mock_subp.assert_called_once_with(
 
274
                ('debconf-set-selections', '-'),
 
275
                "ca-certificates ca-certificates/trust_new_crts select no")