1
# Copyright 2013 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Tests for `ephemerals_script`."""
6
from __future__ import (
22
from maastesting.factory import factory
23
from provisioningserver.import_images import ephemerals_script
24
from provisioningserver.import_images.ephemerals_script import (
26
create_symlinked_image_dir,
27
extract_image_tarball,
28
install_image_from_simplestreams,
30
from provisioningserver.pxe.tftppath import (
34
from provisioningserver.testing.config import ConfigFixture
35
from provisioningserver.testing.testcase import PservTestCase
36
from testtools.matchers import (
40
from provisioningserver.utils import read_text_file
44
"""Return directory and filename component of a file path."""
45
return os.path.dirname(path), os.path.basename(path)
48
class TestHelpers(PservTestCase):
49
def make_target(self):
50
"""Return an existing directory, and nonexistent filename."""
51
return self.make_dir(), factory.make_name()
53
def test_copy_file_by_glob_copies_file(self):
54
content = factory.getRandomString()
55
source_dir, source_name = split_path(self.make_file(contents=content))
56
target_dir, target_name = self.make_target()
59
source_dir, source_name[:3] + '*',
60
target_dir, target_name)
63
os.path.join(source_dir, source_name),
64
FileContains(content))
66
os.path.join(target_dir, target_name),
67
FileContains(content))
69
def test_copy_by_file_returns_target_path(self):
70
source_dir, source_name = split_path(self.make_file())
71
target_dir, target_name = self.make_target()
73
target = copy_file_by_glob(
74
source_dir, source_name, target_dir, target_name)
76
self.assertEqual(os.path.join(target_dir, target_name), target)
78
def test_copy_file_by_glob_ignores_nonmatching_files(self):
79
content = factory.getRandomString()
80
source_dir, source_name = split_path(self.make_file(contents=content))
81
other_content = factory.getRandomString()
82
other_file = factory.make_file(source_dir, contents=other_content)
83
target_dir, target_name = self.make_target()
85
copy_file_by_glob(source_dir, source_name, target_dir, target_name)
87
self.assertThat(other_file, FileContains(other_content))
89
os.path.join(target_dir, target_name),
90
FileContains(content))
91
self.assertItemsEqual(
92
[source_name, os.path.basename(other_file)],
93
os.listdir(source_dir))
94
self.assertItemsEqual([target_name], os.listdir(target_dir))
96
def test_copy_file_by_glob_fails_if_no_files_match(self):
100
self.make_dir(), factory.make_name() + '*',
101
self.make_dir(), factory.make_name())
103
def test_copy_file_by_glob_fails_if_multiple_files_match(self):
104
source_dir = self.make_dir()
105
factory.make_file(source_dir)
106
factory.make_file(source_dir)
111
source_dir, '*', self.make_dir(), factory.make_name())
114
class TestExtractImageTarball(PservTestCase):
115
"""Tests for `extract_image_tarball`."""
117
def test_copies_boot_image_files_from_tarball(self):
118
prefix = factory.make_name()
119
kernel_content = factory.getRandomString()
120
initrd_content = factory.getRandomString()
121
img_content = factory.getRandomString()
122
tarball = factory.make_tarball(self.make_dir(), {
123
'%s-vmlinuz.gz' % prefix: kernel_content,
124
'%s-initrd.gz' % prefix: initrd_content,
125
'%s.img' % prefix: img_content,
127
target_dir = self.make_dir()
128
self.patch(ephemerals_script, 'call_uec2roottar')
130
extract_image_tarball(tarball, target_dir)
132
self.assertItemsEqual(
133
['linux', 'initrd.gz', 'disk.img'],
136
os.path.join(target_dir, 'linux'),
137
FileContains(kernel_content))
139
os.path.join(target_dir, 'initrd.gz'),
140
FileContains(initrd_content))
142
os.path.join(target_dir, 'disk.img'),
143
FileContains(img_content))
145
def test_ignores_extraneous_files_in_tarball(self):
146
prefix = factory.make_name()
147
tarball = factory.make_tarball(self.make_dir(), {
148
'%s-vmlinuz.gz' % prefix: None,
149
'%s-initrd.gz' % prefix: None,
150
'%s.img' % prefix: None,
153
target_dir = self.make_dir()
154
self.patch(ephemerals_script, 'call_uec2roottar')
156
extract_image_tarball(tarball, target_dir)
158
self.assertItemsEqual(
159
['linux', 'initrd.gz', 'disk.img'],
162
def test_runs_uec2roottar(self):
163
check_call = self.patch(subprocess, 'check_call')
164
fake_image = factory.make_name('image')
165
self.patch(ephemerals_script, 'copy_file_by_glob').return_value = (
167
tarball = factory.make_name('tarball') + '.tar.gz'
168
target_dir = self.make_dir()
170
extract_image_tarball(tarball, target_dir)
172
check_call.assert_called_with([
175
os.path.join(target_dir, 'dist-root.tar.gz'),
178
def test_cleans_up_temp_location(self):
179
self.patch(subprocess, 'check_call')
180
fake_image = factory.make_name('image')
181
self.patch(ephemerals_script, 'copy_file_by_glob').return_value = (
183
tarball = factory.make_name('tarball') + '.tar.gz'
184
target_dir = self.make_dir()
185
temp_location = self.make_dir()
187
extract_image_tarball(tarball, target_dir, temp_location)
189
self.assertItemsEqual([], listdir(temp_location))
191
def test_cleans_up_after_failure(self):
192
class DeliberateFailure(RuntimeError):
195
self.patch(subprocess, 'check_call').side_effect = DeliberateFailure()
196
fake_image = factory.make_name('image')
197
self.patch(ephemerals_script, 'copy_file_by_glob').return_value = (
199
tarball = factory.make_name('tarball') + '.tar.gz'
200
target_dir = self.make_dir()
201
temp_location = self.make_dir()
205
extract_image_tarball, tarball, target_dir, temp_location)
207
self.assertItemsEqual([], listdir(temp_location))
210
class TestCreateSymlinkedImageDir(PservTestCase):
211
"""Tests for `create_symlinked_image_dir`."""
213
def make_original_dir(self):
214
"""Create a directory with the kernel, initrd and root tarball."""
215
original_dir = self.make_dir()
216
factory.make_file(original_dir, 'linux')
217
factory.make_file(original_dir, 'initrd.gz')
218
factory.make_file(original_dir, 'dist-root.tar.gz')
221
def test_symlinks_files(self):
222
original_dir = self.make_original_dir()
223
temp_location = self.make_dir()
225
image_dir = create_symlinked_image_dir(original_dir, temp_location)
227
self.assertNotEqual(original_dir, image_dir)
228
self.assertNotEqual(temp_location, image_dir)
229
self.assertThat(image_dir, StartsWith(temp_location + '/'))
230
self.assertItemsEqual(
231
['linux', 'initrd.gz', 'root.tar.gz'],
234
os.path.join(original_dir, 'linux'),
235
readlink(os.path.join(image_dir, 'linux')))
237
os.path.join(original_dir, 'initrd.gz'),
238
readlink(os.path.join(image_dir, 'initrd.gz')))
240
os.path.join(original_dir, 'dist-root.tar.gz'),
241
readlink(os.path.join(image_dir, 'root.tar.gz')))
243
def test_cleans_up_temp_location(self):
244
original_dir = self.make_original_dir()
245
temp_location = self.make_dir()
247
image_dir = create_symlinked_image_dir(original_dir, temp_location)
249
# Nothing is left in temp_location except the result.
250
self.assertItemsEqual(
251
[os.path.basename(image_dir)],
252
listdir(temp_location))
254
def test_cleans_up_after_failure(self):
255
class DeliberateFailure(RuntimeError):
258
self.patch(ephemerals_script, 'symlink').side_effect = (
259
DeliberateFailure("Symlinking intentionally broken"))
260
original_dir = self.make_dir()
261
temp_location = self.make_dir()
265
create_symlinked_image_dir, original_dir, temp_location)
267
self.assertItemsEqual([], listdir(temp_location))
270
class TestInstallImageFromSimplestreams(PservTestCase):
271
"""Tests for `install_image_from_simplestreams`."""
273
def prepare_storage_dir(self):
274
"""Set up a storage directory with kernel, initrd, and root tarball."""
275
storage = self.make_dir()
276
factory.make_file(storage, 'linux')
277
factory.make_file(storage, 'initrd.gz')
278
factory.make_file(storage, 'dist-root.tar.gz')
281
def patch_config(self, tftp_root):
282
"""Set up a fake config, pointing to the given TFTP root directory."""
283
self.useFixture(ConfigFixture({'tftp': {'root': tftp_root}}))
285
def test_installs_image(self):
286
tftp_root = self.make_dir()
287
self.patch_config(tftp_root)
288
storage_dir = self.prepare_storage_dir()
289
release = factory.make_name('release')
290
arch = factory.make_name('arch')
292
install_image_from_simplestreams(
293
storage_dir, release=release, arch=arch)
295
install_dir = locate_tftp_path(
296
compose_image_path(arch, 'generic', release, 'commissioning'),
298
self.assertItemsEqual(
299
['linux', 'initrd.gz', 'root.tar.gz'],
300
listdir(install_dir))
302
os.path.join(install_dir, 'linux'),
303
FileContains(read_text_file(os.path.join(storage_dir, 'linux'))))
305
def test_cleans_up_temp_location(self):
306
self.patch(ephemerals_script, 'install_image')
307
temp_location = self.make_dir()
308
storage_dir = self.prepare_storage_dir()
310
install_image_from_simplestreams(
311
storage_dir, release=factory.make_name('release'),
312
arch=factory.make_name('arch'), temp_location=temp_location)
314
self.assertItemsEqual([], listdir(temp_location))
316
def test_cleans_up_after_failure(self):
317
class DeliberateFailure(RuntimeError):
320
self.patch(ephemerals_script, 'install_image').side_effect = (
322
temp_location = self.make_dir()
323
storage_dir = self.prepare_storage_dir()
327
install_image_from_simplestreams,
328
storage_dir, release=factory.make_name('release'),
329
arch=factory.make_name('arch'), temp_location=temp_location)
331
self.assertItemsEqual([], listdir(temp_location))