~jbauer/cloud-init/salt

« back to all changes in this revision

Viewing changes to tests/unittests/test_util.py

  • Committer: Scott Moser
  • Date: 2012-01-17 21:38:01 UTC
  • mfrom: (499.2.18 add-ca-certs)
  • Revision ID: smoser@ubuntu.com-20120117213801-u7rvefcg2u3ptwbc
add support for add/remove CA Certificates via cloud-config (LP: #915232)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
from unittest import TestCase
2
 
 
3
 
from cloudinit.util import mergedict
 
2
from mocker import MockerTestCase
 
3
from tempfile import mkdtemp
 
4
from shutil import rmtree
 
5
import os
 
6
import stat
 
7
 
 
8
from cloudinit.util import (mergedict, get_cfg_option_list_or_str, write_file,
 
9
                            delete_dir_contents)
 
10
 
4
11
 
5
12
class TestMergeDict(TestCase):
6
13
    def test_simple_merge(self):
 
14
        """Test simple non-conflict merge."""
7
15
        source = {"key1": "value1"}
8
16
        candidate = {"key2": "value2"}
9
17
        result = mergedict(source, candidate)
10
18
        self.assertEqual({"key1": "value1", "key2": "value2"}, result)
11
19
 
12
20
    def test_nested_merge(self):
 
21
        """Test nested merge."""
13
22
        source = {"key1": {"key1.1": "value1.1"}}
14
23
        candidate = {"key1": {"key1.2": "value1.2"}}
15
24
        result = mergedict(source, candidate)
17
26
            {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
18
27
 
19
28
    def test_merge_does_not_override(self):
 
29
        """Test that candidate doesn't override source."""
20
30
        source = {"key1": "value1", "key2": "value2"}
21
31
        candidate = {"key2": "value2", "key2": "NEW VALUE"}
22
32
        result = mergedict(source, candidate)
23
33
        self.assertEqual(source, result)
24
34
 
25
35
    def test_empty_candidate(self):
 
36
        """Test empty candidate doesn't change source."""
26
37
        source = {"key": "value"}
27
38
        candidate = {}
28
39
        result = mergedict(source, candidate)
29
40
        self.assertEqual(source, result)
30
41
 
31
42
    def test_empty_source(self):
 
43
        """Test empty source is replaced by candidate."""
32
44
        source = {}
33
45
        candidate = {"key": "value"}
34
46
        result = mergedict(source, candidate)
35
47
        self.assertEqual(candidate, result)
36
48
 
37
49
    def test_non_dict_candidate(self):
 
50
        """Test non-dict candidate is discarded."""
38
51
        source = {"key": "value"}
39
52
        candidate = "not a dict"
40
53
        result = mergedict(source, candidate)
41
54
        self.assertEqual(source, result)
42
55
 
43
56
    def test_non_dict_source(self):
 
57
        """Test non-dict source is not modified with a dict candidate."""
44
58
        source = "not a dict"
45
59
        candidate = {"key": "value"}
46
60
        result = mergedict(source, candidate)
47
61
        self.assertEqual(source, result)
48
62
 
49
63
    def test_neither_dict(self):
 
64
        """Test if neither candidate or source is dict source wins."""
50
65
        source = "source"
51
66
        candidate = "candidate"
52
67
        result = mergedict(source, candidate)
53
68
        self.assertEqual(source, result)
 
69
 
 
70
 
 
71
class TestGetCfgOptionListOrStr(TestCase):
 
72
    def test_not_found_no_default(self):
 
73
        """None is returned if key is not found and no default given."""
 
74
        config = {}
 
75
        result = get_cfg_option_list_or_str(config, "key")
 
76
        self.assertIsNone(result)
 
77
 
 
78
    def test_not_found_with_default(self):
 
79
        """Default is returned if key is not found."""
 
80
        config = {}
 
81
        result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
 
82
        self.assertEqual(["DEFAULT"], result)
 
83
 
 
84
    def test_found_with_default(self):
 
85
        """Default is not returned if key is found."""
 
86
        config = {"key": ["value1"]}
 
87
        result = get_cfg_option_list_or_str(config, "key", default=["DEFAULT"])
 
88
        self.assertEqual(["value1"], result)
 
89
 
 
90
    def test_found_convert_to_list(self):
 
91
        """Single string is converted to one element list."""
 
92
        config = {"key": "value1"}
 
93
        result = get_cfg_option_list_or_str(config, "key")
 
94
        self.assertEqual(["value1"], result)
 
95
 
 
96
    def test_value_is_none(self):
 
97
        """If value is None empty list is returned."""
 
98
        config = {"key": None}
 
99
        result = get_cfg_option_list_or_str(config, "key")
 
100
        self.assertEqual([], result)
 
101
 
 
102
 
 
103
class TestWriteFile(MockerTestCase):
 
104
    def setUp(self):
 
105
        super(TestWriteFile, self).setUp()
 
106
        # Make a temp directoy for tests to use.
 
107
        self.tmp = mkdtemp(prefix="unittest_")
 
108
 
 
109
    def tearDown(self):
 
110
        super(TestWriteFile, self).tearDown()
 
111
        # Clean up temp directory
 
112
        rmtree(self.tmp)
 
113
 
 
114
    def test_basic_usage(self):
 
115
        """Verify basic usage with default args."""
 
116
        path = os.path.join(self.tmp, "NewFile.txt")
 
117
        contents = "Hey there"
 
118
 
 
119
        write_file(path, contents)
 
120
 
 
121
        self.assertTrue(os.path.exists(path))
 
122
        self.assertTrue(os.path.isfile(path))
 
123
        with open(path) as f:
 
124
            create_contents = f.read()
 
125
            self.assertEqual(contents, create_contents)
 
126
        file_stat = os.stat(path)
 
127
        self.assertEqual(0644, stat.S_IMODE(file_stat.st_mode))
 
128
 
 
129
    def test_dir_is_created_if_required(self):
 
130
        """Verifiy that directories are created is required."""
 
131
        dirname = os.path.join(self.tmp, "subdir")
 
132
        path = os.path.join(dirname, "NewFile.txt")
 
133
        contents = "Hey there"
 
134
 
 
135
        write_file(path, contents)
 
136
 
 
137
        self.assertTrue(os.path.isdir(dirname))
 
138
        self.assertTrue(os.path.isfile(path))
 
139
 
 
140
    def test_custom_mode(self):
 
141
        """Verify custom mode works properly."""
 
142
        path = os.path.join(self.tmp, "NewFile.txt")
 
143
        contents = "Hey there"
 
144
 
 
145
        write_file(path, contents, mode=0666)
 
146
 
 
147
        self.assertTrue(os.path.exists(path))
 
148
        self.assertTrue(os.path.isfile(path))
 
149
        file_stat = os.stat(path)
 
150
        self.assertEqual(0666, stat.S_IMODE(file_stat.st_mode))
 
151
 
 
152
    def test_custom_omode(self):
 
153
        """Verify custom omode works properly."""
 
154
        path = os.path.join(self.tmp, "NewFile.txt")
 
155
        contents = "Hey there"
 
156
 
 
157
        # Create file first with basic content
 
158
        with open(path, "wb") as f:
 
159
            f.write("LINE1\n")
 
160
        write_file(path, contents, omode="a")
 
161
 
 
162
        self.assertTrue(os.path.exists(path))
 
163
        self.assertTrue(os.path.isfile(path))
 
164
        with open(path) as f:
 
165
            create_contents = f.read()
 
166
            self.assertEqual("LINE1\nHey there", create_contents)
 
167
 
 
168
    def test_restorecon_if_possible_is_called(self):
 
169
        """Make sure the restorecon_if_possible is called correctly."""
 
170
        path = os.path.join(self.tmp, "NewFile.txt")
 
171
        contents = "Hey there"
 
172
 
 
173
        # Mock out the restorecon_if_possible call to test if it's called.
 
174
        mock_restorecon = self.mocker.replace(
 
175
            "cloudinit.util.restorecon_if_possible", passthrough=False)
 
176
        mock_restorecon(path)
 
177
        self.mocker.replay()
 
178
 
 
179
        write_file(path, contents)
 
180
 
 
181
 
 
182
class TestDeleteDirContents(TestCase):
 
183
    def setUp(self):
 
184
        super(TestDeleteDirContents, self).setUp()
 
185
        # Make a temp directoy for tests to use.
 
186
        self.tmp = mkdtemp(prefix="unittest_")
 
187
 
 
188
    def tearDown(self):
 
189
        super(TestDeleteDirContents, self).tearDown()
 
190
        # Clean up temp directory
 
191
        rmtree(self.tmp)
 
192
 
 
193
    def assertDirEmpty(self, dirname):
 
194
        self.assertEqual([], os.listdir(dirname))
 
195
 
 
196
    def test_does_not_delete_dir(self):
 
197
        """Ensure directory itself is not deleted."""
 
198
        delete_dir_contents(self.tmp)
 
199
 
 
200
        self.assertTrue(os.path.isdir(self.tmp))
 
201
        self.assertDirEmpty(self.tmp)
 
202
 
 
203
    def test_deletes_files(self):
 
204
        """Single file should be deleted."""
 
205
        with open(os.path.join(self.tmp, "new_file.txt"), "wb") as f:
 
206
            f.write("DELETE ME")
 
207
 
 
208
        delete_dir_contents(self.tmp)
 
209
 
 
210
        self.assertDirEmpty(self.tmp)
 
211
 
 
212
    def test_deletes_empty_dirs(self):
 
213
        """Empty directories should be deleted."""
 
214
        os.mkdir(os.path.join(self.tmp, "new_dir"))
 
215
 
 
216
        delete_dir_contents(self.tmp)
 
217
 
 
218
        self.assertDirEmpty(self.tmp)
 
219
 
 
220
    def test_deletes_nested_dirs(self):
 
221
        """Nested directories should be deleted."""
 
222
        os.mkdir(os.path.join(self.tmp, "new_dir"))
 
223
        os.mkdir(os.path.join(self.tmp, "new_dir", "new_subdir"))
 
224
 
 
225
        delete_dir_contents(self.tmp)
 
226
 
 
227
        self.assertDirEmpty(self.tmp)
 
228
 
 
229
    def test_deletes_non_empty_dirs(self):
 
230
        """Non-empty directories should be deleted."""
 
231
        os.mkdir(os.path.join(self.tmp, "new_dir"))
 
232
        f_name = os.path.join(self.tmp, "new_dir", "new_file.txt")
 
233
        with open(f_name, "wb") as f:
 
234
            f.write("DELETE ME")
 
235
 
 
236
        delete_dir_contents(self.tmp)
 
237
 
 
238
        self.assertDirEmpty(self.tmp)
 
239
 
 
240
    def test_deletes_symlinks(self):
 
241
        """Symlinks should be deleted."""
 
242
        file_name = os.path.join(self.tmp, "new_file.txt")
 
243
        link_name = os.path.join(self.tmp, "new_file_link.txt")
 
244
        with open(file_name, "wb") as f:
 
245
            f.write("DELETE ME")
 
246
        os.symlink(file_name, link_name)
 
247
 
 
248
        delete_dir_contents(self.tmp)
 
249
 
 
250
        self.assertDirEmpty(self.tmp)