~smoser/ubuntu/vivid/cloud-init/snappy

« back to all changes in this revision

Viewing changes to tests/unittests/test__init__.py

  • Committer: Scott Moser
  • Date: 2015-02-27 20:55:58 UTC
  • mfrom: (355.2.8 vivid)
  • Revision ID: smoser@ubuntu.com-20150227205558-glrwdgxqkaz6zyxa
* Merge with vivid at 0.7.7~bzr1067-0ubuntu1
* New upstream snapshot.
  * fix broken consumption of gzipped user-data (LP: #1424900)
  * functional user-data on Azure again (LP: #1423972)
  * CloudStack: support fetching password from virtual router (LP: #1422388)
* New upstream snapshot.
  * Fix for ascii decode in DataSourceAzure (LP: #1422993).
* New upstream snapshot.
  * support for gpt partitioning, utilized in Azure [Daniel Watkins]
  * fix bug in exception handling in mount_cb.
* New upstream snapshot.
  * move to python3 (LP: #1247132)
  * systemd: run cloud-init before systemd-user-sessions.service
  * Use the GCE short hostname. (LP: #1383794)
  * Enable user-data encoding support for GCE. (LP: #1404311)
  * Update to use a newer and better OMNIBUS_URL
  * Be more tolerant of 'ssh_authorized_keys' types
  * Fix parse_ssh_config failing in ssh_util.py
  * Increase the robustness/configurability of the chef module.
  * retain trailing newline from template files when using
    jinja2 (LP: #1355343)
  * fix broken output handling (LP: #1387340)
  * digital ocean datasource
  * update url in config drive documentation
  * freebsd: enable correct behavior on Ec2.
  * freebsd: Use the proper virtio FreeBSD network interface name.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
import os
 
2
import shutil
 
3
import tempfile
 
4
import unittest
2
5
 
3
 
from mocker import MockerTestCase, ARGS, KWARGS
 
6
try:
 
7
    from unittest import mock
 
8
except ImportError:
 
9
    import mock
 
10
try:
 
11
    from contextlib import ExitStack
 
12
except ImportError:
 
13
    from contextlib2 import ExitStack
4
14
 
5
15
from cloudinit import handlers
6
16
from cloudinit import helpers
7
 
from cloudinit import importer
8
17
from cloudinit import settings
9
18
from cloudinit import url_helper
10
19
from cloudinit import util
11
20
 
 
21
from .helpers import TestCase
 
22
 
12
23
 
13
24
class FakeModule(handlers.Handler):
14
25
    def __init__(self):
22
33
        pass
23
34
 
24
35
 
25
 
class TestWalkerHandleHandler(MockerTestCase):
 
36
class TestWalkerHandleHandler(TestCase):
26
37
 
27
38
    def setUp(self):
28
 
 
29
 
        MockerTestCase.setUp(self)
 
39
        super(TestWalkerHandleHandler, self).setUp()
 
40
        tmpdir = tempfile.mkdtemp()
 
41
        self.addCleanup(shutil.rmtree, tmpdir)
30
42
 
31
43
        self.data = {
32
44
            "handlercount": 0,
33
45
            "frequency": "",
34
 
            "handlerdir": self.makeDir(),
 
46
            "handlerdir": tmpdir,
35
47
            "handlers": helpers.ContentHandlers(),
36
48
            "data": None}
37
49
 
38
50
        self.expected_module_name = "part-handler-%03d" % (
39
51
            self.data["handlercount"],)
40
52
        expected_file_name = "%s.py" % self.expected_module_name
41
 
        expected_file_fullname = os.path.join(self.data["handlerdir"],
42
 
                                              expected_file_name)
 
53
        self.expected_file_fullname = os.path.join(
 
54
            self.data["handlerdir"], expected_file_name)
43
55
        self.module_fake = FakeModule()
44
56
        self.ctype = None
45
57
        self.filename = None
46
58
        self.payload = "dummy payload"
47
59
 
48
 
        # Mock the write_file function
49
 
        write_file_mock = self.mocker.replace(util.write_file,
50
 
                                              passthrough=False)
51
 
        write_file_mock(expected_file_fullname, self.payload, 0600)
 
60
        # Mock the write_file() function.  We'll assert that it got called as
 
61
        # expected in each of the individual tests.
 
62
        resources = ExitStack()
 
63
        self.addCleanup(resources.close)
 
64
        self.write_file_mock = resources.enter_context(
 
65
            mock.patch('cloudinit.util.write_file'))
52
66
 
53
67
    def test_no_errors(self):
54
68
        """Payload gets written to file and added to C{pdata}."""
55
 
        import_mock = self.mocker.replace(importer.import_module,
56
 
                                          passthrough=False)
57
 
        import_mock(self.expected_module_name)
58
 
        self.mocker.result(self.module_fake)
59
 
        self.mocker.replay()
60
 
 
61
 
        handlers.walker_handle_handler(self.data, self.ctype, self.filename,
62
 
                                       self.payload)
63
 
 
64
 
        self.assertEqual(1, self.data["handlercount"])
 
69
        with mock.patch('cloudinit.importer.import_module',
 
70
                        return_value=self.module_fake) as mockobj:
 
71
            handlers.walker_handle_handler(self.data, self.ctype,
 
72
                                           self.filename, self.payload)
 
73
            mockobj.assert_called_with_once(self.expected_module_name)
 
74
        self.write_file_mock.assert_called_with_once(
 
75
            self.expected_file_fullname, self.payload, 0o600)
 
76
        self.assertEqual(self.data['handlercount'], 1)
65
77
 
66
78
    def test_import_error(self):
67
79
        """Module import errors are logged. No handler added to C{pdata}."""
68
 
        import_mock = self.mocker.replace(importer.import_module,
69
 
                                          passthrough=False)
70
 
        import_mock(self.expected_module_name)
71
 
        self.mocker.throw(ImportError())
72
 
        self.mocker.replay()
73
 
 
74
 
        handlers.walker_handle_handler(self.data, self.ctype, self.filename,
75
 
                                       self.payload)
76
 
 
77
 
        self.assertEqual(0, self.data["handlercount"])
 
80
        with mock.patch('cloudinit.importer.import_module',
 
81
                        side_effect=ImportError) as mockobj:
 
82
            handlers.walker_handle_handler(self.data, self.ctype,
 
83
                                           self.filename, self.payload)
 
84
            mockobj.assert_called_with_once(self.expected_module_name)
 
85
        self.write_file_mock.assert_called_with_once(
 
86
            self.expected_file_fullname, self.payload, 0o600)
 
87
        self.assertEqual(self.data['handlercount'], 0)
78
88
 
79
89
    def test_attribute_error(self):
80
90
        """Attribute errors are logged. No handler added to C{pdata}."""
81
 
        import_mock = self.mocker.replace(importer.import_module,
82
 
                                          passthrough=False)
83
 
        import_mock(self.expected_module_name)
84
 
        self.mocker.result(self.module_fake)
85
 
        self.mocker.throw(AttributeError())
86
 
        self.mocker.replay()
87
 
 
88
 
        handlers.walker_handle_handler(self.data, self.ctype, self.filename,
89
 
                                       self.payload)
90
 
 
91
 
        self.assertEqual(0, self.data["handlercount"])
92
 
 
93
 
 
94
 
class TestHandlerHandlePart(MockerTestCase):
 
91
        with mock.patch('cloudinit.importer.import_module',
 
92
                        side_effect=AttributeError,
 
93
                        return_value=self.module_fake) as mockobj:
 
94
            handlers.walker_handle_handler(self.data, self.ctype,
 
95
                                           self.filename, self.payload)
 
96
            mockobj.assert_called_with_once(self.expected_module_name)
 
97
        self.write_file_mock.assert_called_with_once(
 
98
            self.expected_file_fullname, self.payload, 0o600)
 
99
        self.assertEqual(self.data['handlercount'], 0)
 
100
 
 
101
 
 
102
class TestHandlerHandlePart(unittest.TestCase):
95
103
 
96
104
    def setUp(self):
97
105
        self.data = "fake data"
108
116
        C{handle_part} is called without C{frequency} for
109
117
        C{handler_version} == 1.
110
118
        """
111
 
        mod_mock = self.mocker.mock()
112
 
        getattr(mod_mock, "frequency")
113
 
        self.mocker.result(settings.PER_INSTANCE)
114
 
        getattr(mod_mock, "handler_version")
115
 
        self.mocker.result(1)
116
 
        mod_mock.handle_part(self.data, self.ctype, self.filename,
117
 
                             self.payload)
118
 
        self.mocker.replay()
119
 
 
120
 
        handlers.run_part(mod_mock, self.data, self.filename,
121
 
                          self.payload, self.frequency, self.headers)
 
119
        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
 
120
                             handler_version=1)
 
121
        handlers.run_part(mod_mock, self.data, self.filename, self.payload,
 
122
                          self.frequency, self.headers)
 
123
        # Assert that the handle_part() method of the mock object got
 
124
        # called with the expected arguments.
 
125
        mod_mock.handle_part.assert_called_with_once(
 
126
            self.data, self.ctype, self.filename, self.payload)
122
127
 
123
128
    def test_normal_version_2(self):
124
129
        """
125
130
        C{handle_part} is called with C{frequency} for
126
131
        C{handler_version} == 2.
127
132
        """
128
 
        mod_mock = self.mocker.mock()
129
 
        getattr(mod_mock, "frequency")
130
 
        self.mocker.result(settings.PER_INSTANCE)
131
 
        getattr(mod_mock, "handler_version")
132
 
        self.mocker.result(2)
133
 
        mod_mock.handle_part(self.data, self.ctype, self.filename,
134
 
                             self.payload, self.frequency)
135
 
        self.mocker.replay()
136
 
 
137
 
        handlers.run_part(mod_mock, self.data, self.filename,
138
 
                          self.payload, self.frequency, self.headers)
 
133
        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
 
134
                             handler_version=2)
 
135
        handlers.run_part(mod_mock, self.data, self.filename, self.payload,
 
136
                          self.frequency, self.headers)
 
137
        # Assert that the handle_part() method of the mock object got
 
138
        # called with the expected arguments.
 
139
        mod_mock.handle_part.assert_called_with_once(
 
140
            self.data, self.ctype, self.filename, self.payload)
139
141
 
140
142
    def test_modfreq_per_always(self):
141
143
        """
142
144
        C{handle_part} is called regardless of frequency if nofreq is always.
143
145
        """
144
146
        self.frequency = "once"
145
 
        mod_mock = self.mocker.mock()
146
 
        getattr(mod_mock, "frequency")
147
 
        self.mocker.result(settings.PER_ALWAYS)
148
 
        getattr(mod_mock, "handler_version")
149
 
        self.mocker.result(1)
150
 
        mod_mock.handle_part(self.data, self.ctype, self.filename,
151
 
                             self.payload)
152
 
        self.mocker.replay()
153
 
 
154
 
        handlers.run_part(mod_mock, self.data, self.filename,
155
 
                          self.payload, self.frequency, self.headers)
 
147
        mod_mock = mock.Mock(frequency=settings.PER_ALWAYS,
 
148
                             handler_version=1)
 
149
        handlers.run_part(mod_mock, self.data, self.filename, self.payload,
 
150
                          self.frequency, self.headers)
 
151
        # Assert that the handle_part() method of the mock object got
 
152
        # called with the expected arguments.
 
153
        mod_mock.handle_part.assert_called_with_once(
 
154
            self.data, self.ctype, self.filename, self.payload)
156
155
 
157
156
    def test_no_handle_when_modfreq_once(self):
158
157
        """C{handle_part} is not called if frequency is once."""
159
158
        self.frequency = "once"
160
 
        mod_mock = self.mocker.mock()
161
 
        getattr(mod_mock, "frequency")
162
 
        self.mocker.result(settings.PER_ONCE)
163
 
        self.mocker.replay()
164
 
 
165
 
        handlers.run_part(mod_mock, self.data, self.filename,
166
 
                          self.payload, self.frequency, self.headers)
 
159
        mod_mock = mock.Mock(frequency=settings.PER_ONCE)
 
160
        handlers.run_part(mod_mock, self.data, self.filename, self.payload,
 
161
                          self.frequency, self.headers)
 
162
        # Assert that the handle_part() method of the mock object got
 
163
        # called with the expected arguments.
 
164
        mod_mock.handle_part.assert_called_with_once(
 
165
            self.data, self.ctype, self.filename, self.payload)
167
166
 
168
167
    def test_exception_is_caught(self):
169
168
        """Exceptions within C{handle_part} are caught and logged."""
170
 
        mod_mock = self.mocker.mock()
171
 
        getattr(mod_mock, "frequency")
172
 
        self.mocker.result(settings.PER_INSTANCE)
173
 
        getattr(mod_mock, "handler_version")
174
 
        self.mocker.result(1)
175
 
        mod_mock.handle_part(self.data, self.ctype, self.filename,
176
 
                             self.payload)
177
 
        self.mocker.throw(Exception())
178
 
        self.mocker.replay()
179
 
 
180
 
        handlers.run_part(mod_mock, self.data, self.filename,
181
 
                          self.payload, self.frequency, self.headers)
182
 
 
183
 
 
184
 
class TestCmdlineUrl(MockerTestCase):
 
169
        mod_mock = mock.Mock(frequency=settings.PER_INSTANCE,
 
170
                             handler_version=1)
 
171
        handlers.run_part(mod_mock, self.data, self.filename, self.payload,
 
172
                          self.frequency, self.headers)
 
173
        mod_mock.handle_part.side_effect = Exception
 
174
        handlers.run_part(mod_mock, self.data, self.filename, self.payload,
 
175
                          self.frequency, self.headers)
 
176
        mod_mock.handle_part.assert_called_with_once(
 
177
            self.data, self.ctype, self.filename, self.payload)
 
178
 
 
179
 
 
180
class TestCmdlineUrl(unittest.TestCase):
185
181
    def test_invalid_content(self):
186
182
        url = "http://example.com/foo"
187
183
        key = "mykey"
188
184
        payload = "0"
189
185
        cmdline = "ro %s=%s bar=1" % (key, url)
190
186
 
191
 
        mock_readurl = self.mocker.replace(url_helper.readurl,
192
 
                                           passthrough=False)
193
 
        mock_readurl(url, ARGS, KWARGS)
194
 
        self.mocker.result(url_helper.StringResponse(payload))
195
 
        self.mocker.replay()
196
 
 
197
 
        self.assertEqual((key, url, None),
198
 
            util.get_cmdline_url(names=[key], starts="xxxxxx",
199
 
                                 cmdline=cmdline))
 
187
        with mock.patch('cloudinit.url_helper.readurl',
 
188
                        return_value=url_helper.StringResponse(payload)):
 
189
            self.assertEqual(
 
190
                util.get_cmdline_url(names=[key], starts="xxxxxx",
 
191
                                     cmdline=cmdline),
 
192
                (key, url, None))
200
193
 
201
194
    def test_valid_content(self):
202
195
        url = "http://example.com/foo"
204
197
        payload = "xcloud-config\nmydata: foo\nbar: wark\n"
205
198
        cmdline = "ro %s=%s bar=1" % (key, url)
206
199
 
207
 
        mock_readurl = self.mocker.replace(url_helper.readurl,
208
 
                                           passthrough=False)
209
 
        mock_readurl(url, ARGS, KWARGS)
210
 
        self.mocker.result(url_helper.StringResponse(payload))
211
 
        self.mocker.replay()
212
 
 
213
 
        self.assertEqual((key, url, payload),
214
 
            util.get_cmdline_url(names=[key], starts="xcloud-config",
215
 
                            cmdline=cmdline))
 
200
        with mock.patch('cloudinit.url_helper.readurl',
 
201
                        return_value=url_helper.StringResponse(payload)):
 
202
            self.assertEqual(
 
203
                util.get_cmdline_url(names=[key], starts="xcloud-config",
 
204
                                     cmdline=cmdline),
 
205
                (key, url, payload))
216
206
 
217
207
    def test_no_key_found(self):
218
208
        url = "http://example.com/foo"
219
209
        key = "mykey"
220
210
        cmdline = "ro %s=%s bar=1" % (key, url)
221
211
 
222
 
        self.mocker.replace(url_helper.readurl, passthrough=False)
223
 
        self.mocker.result(url_helper.StringResponse(""))
224
 
        self.mocker.replay()
 
212
        with mock.patch('cloudinit.url_helper.readurl',
 
213
                        return_value=url_helper.StringResponse('')):
 
214
            self.assertEqual(
 
215
                util.get_cmdline_url(names=["does-not-appear"],
 
216
                                     starts="#cloud-config", cmdline=cmdline),
 
217
                (None, None, None))
225
218
 
226
 
        self.assertEqual((None, None, None),
227
 
            util.get_cmdline_url(names=["does-not-appear"],
228
 
                starts="#cloud-config", cmdline=cmdline))
229
219
 
230
220
# vi: ts=4 expandtab