1
"""Tests for handling of userdata within cloud init."""
10
from unittest import mock
14
from six import BytesIO, StringIO
16
from email import encoders
17
from email.mime.application import MIMEApplication
18
from email.mime.base import MIMEBase
19
from email.mime.multipart import MIMEMultipart
21
from cloudinit import handlers
22
from cloudinit import helpers as c_helpers
23
from cloudinit import log
24
from cloudinit.settings import (PER_INSTANCE)
25
from cloudinit import sources
26
from cloudinit import stages
27
from cloudinit import user_data as ud
28
from cloudinit import util
33
INSTANCE_ID = "i-testing"
36
class FakeDataSource(sources.DataSource):
38
def __init__(self, userdata=None, vendordata=None):
39
sources.DataSource.__init__(self, {}, None, None)
40
self.metadata = {'instance-id': INSTANCE_ID}
41
self.userdata_raw = userdata
42
self.vendordata_raw = vendordata
45
def count_messages(root):
48
if ud.is_skippable(m):
56
f = gzip.GzipFile(fileobj=contents, mode='wb')
57
f.write(util.encode_text(text))
60
return contents.getvalue()
63
# FIXME: these tests shouldn't be checking log output??
65
class TestConsumeUserData(helpers.FilesystemMockingTestCase):
68
super(TestConsumeUserData, self).setUp()
71
self._log_handler = None
74
if self._log_handler and self._log:
75
self._log.removeHandler(self._log_handler)
76
helpers.FilesystemMockingTestCase.tearDown(self)
78
def _patchIn(self, root):
82
def capture_log(self, lvl=logging.DEBUG):
84
self._log_handler = logging.StreamHandler(log_file)
85
self._log_handler.setLevel(lvl)
86
self._log = log.getLogger()
87
self._log.addHandler(self._log_handler)
90
def test_simple_jsonp(self):
94
{ "op": "add", "path": "/baz", "value": "qux" },
95
{ "op": "add", "path": "/bar", "value": "qux2" }
100
ci.datasource = FakeDataSource(blob)
101
new_root = tempfile.mkdtemp()
102
self.addCleanup(shutil.rmtree, new_root)
103
self.patchUtils(new_root)
104
self.patchOS(new_root)
107
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
108
cc = util.load_yaml(cc_contents)
109
self.assertEqual(2, len(cc))
110
self.assertEqual('qux', cc['baz'])
111
self.assertEqual('qux2', cc['bar'])
113
def test_simple_jsonp_vendor_and_user(self):
114
# test that user-data wins over vendor
118
{ "op": "add", "path": "/baz", "value": "qux" },
119
{ "op": "add", "path": "/bar", "value": "qux2" }
125
{ "op": "add", "path": "/baz", "value": "quxA" },
126
{ "op": "add", "path": "/bar", "value": "quxB" },
127
{ "op": "add", "path": "/foo", "value": "quxC" }
130
new_root = tempfile.mkdtemp()
131
self.addCleanup(shutil.rmtree, new_root)
132
self._patchIn(new_root)
133
initer = stages.Init()
134
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
140
initer.cloudify().run('consume_data',
144
mods = stages.Modules(initer)
145
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
147
self.assertIn('vendor_data', cfg)
148
self.assertEqual('qux', cfg['baz'])
149
self.assertEqual('qux2', cfg['bar'])
150
self.assertEqual('quxC', cfg['foo'])
152
def test_simple_jsonp_no_vendor_consumed(self):
153
# make sure that vendor data is not consumed
157
{ "op": "add", "path": "/baz", "value": "qux" },
158
{ "op": "add", "path": "/bar", "value": "qux2" },
159
{ "op": "add", "path": "/vendor_data", "value": {"enabled": "false"}}
165
{ "op": "add", "path": "/baz", "value": "quxA" },
166
{ "op": "add", "path": "/bar", "value": "quxB" },
167
{ "op": "add", "path": "/foo", "value": "quxC" }
170
new_root = tempfile.mkdtemp()
171
self.addCleanup(shutil.rmtree, new_root)
172
self._patchIn(new_root)
173
initer = stages.Init()
174
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
180
initer.cloudify().run('consume_data',
184
mods = stages.Modules(initer)
185
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
187
self.assertEqual('qux', cfg['baz'])
188
self.assertEqual('qux2', cfg['bar'])
189
self.assertNotIn('foo', cfg)
191
def test_mixed_cloud_config(self):
197
message_cc = MIMEBase("text", "cloud-config")
198
message_cc.set_payload(blob_cc)
203
{ "op": "replace", "path": "/a", "value": "c" },
204
{ "op": "remove", "path": "/c" }
208
message_jp = MIMEBase('text', "cloud-config-jsonp")
209
message_jp.set_payload(blob_jp)
211
message = MIMEMultipart()
212
message.attach(message_cc)
213
message.attach(message_jp)
216
ci.datasource = FakeDataSource(str(message))
217
new_root = tempfile.mkdtemp()
218
self.addCleanup(shutil.rmtree, new_root)
219
self.patchUtils(new_root)
220
self.patchOS(new_root)
223
cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
224
cc = util.load_yaml(cc_contents)
225
self.assertEqual(1, len(cc))
226
self.assertEqual('c', cc['a'])
228
def test_vendor_user_yaml_cloud_config(self):
248
new_root = tempfile.mkdtemp()
249
self.addCleanup(shutil.rmtree, new_root)
250
self._patchIn(new_root)
251
initer = stages.Init()
252
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
258
initer.cloudify().run('consume_data',
262
mods = stages.Modules(initer)
263
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
265
self.assertIn('vendor_data', cfg)
266
self.assertEqual('c', cfg['a'])
267
self.assertEqual('user', cfg['name'])
268
self.assertNotIn('x', cfg['run'])
269
self.assertNotIn('y', cfg['run'])
270
self.assertIn('z', cfg['run'])
272
def test_vendordata_script(self):
284
new_root = tempfile.mkdtemp()
285
self.addCleanup(shutil.rmtree, new_root)
286
self._patchIn(new_root)
287
initer = stages.Init()
288
initer.datasource = FakeDataSource(user_blob, vendordata=vendor_blob)
294
initer.cloudify().run('consume_data',
298
mods = stages.Modules(initer)
299
(_which_ran, _failures) = mods.run_section('cloud_init_modules')
300
vendor_script = initer.paths.get_ipath_cur('vendor_scripts')
301
vendor_script_fns = "%s%s/part-001" % (new_root, vendor_script)
302
self.assertTrue(os.path.exists(vendor_script_fns))
304
def test_merging_cloud_config(self):
313
message1 = MIMEBase("text", "cloud-config")
314
message1.set_payload(blob)
324
message2 = MIMEBase("text", "cloud-config")
325
message2['X-Merge-Type'] = ('dict(recurse_array,'
326
'recurse_str)+list(append)+str(append)')
327
message2.set_payload(blob2)
337
message3 = MIMEBase("text", "cloud-config")
338
message3.set_payload(blob3)
340
messages = [message1, message2, message3]
342
paths = c_helpers.Paths({}, ds=FakeDataSource(''))
343
cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
345
new_root = tempfile.mkdtemp()
346
self.addCleanup(shutil.rmtree, new_root)
347
self.patchUtils(new_root)
348
self.patchOS(new_root)
349
cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
351
for i, m in enumerate(messages):
353
fn = "part-%s" % (i + 1)
354
payload = m.get_payload(decode=True)
355
cloud_cfg.handle_part(None, headers['Content-Type'],
356
fn, payload, None, headers)
357
cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
359
contents = util.load_file(paths.get_ipath('cloud_config'))
360
contents = util.load_yaml(contents)
361
self.assertEqual(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
362
self.assertEqual(contents['a'], 'be')
363
self.assertEqual(contents['e'], [1, 2, 3])
364
self.assertEqual(contents['p'], 1)
366
def test_unhandled_type_warning(self):
367
"""Raw text without magic is ignored but shows warning."""
369
data = "arbitrary text\n"
370
ci.datasource = FakeDataSource(data)
372
with mock.patch('cloudinit.util.write_file') as mockobj:
373
log_file = self.capture_log(logging.WARNING)
377
"Unhandled non-multipart (text/x-not-multipart) userdata:",
380
mockobj.assert_called_once_with(
381
ci.paths.get_ipath("cloud_config"), "", 0o600)
383
def test_mime_gzip_compressed(self):
384
"""Tests that individual message gzip encoding works."""
387
return MIMEApplication(gzip_text(text), 'gzip')
400
message = MIMEMultipart('test')
401
message.attach(gzip_part(base_content1))
402
message.attach(gzip_part(base_content2))
404
ci.datasource = FakeDataSource(str(message))
405
new_root = tempfile.mkdtemp()
406
self.addCleanup(shutil.rmtree, new_root)
407
self.patchUtils(new_root)
408
self.patchOS(new_root)
411
contents = util.load_file(ci.paths.get_ipath("cloud_config"))
412
contents = util.load_yaml(contents)
413
self.assertTrue(isinstance(contents, dict))
414
self.assertEqual(3, len(contents))
415
self.assertEqual(2, contents['a'])
416
self.assertEqual(3, contents['b'])
417
self.assertEqual(4, contents['c'])
419
def test_mime_text_plain(self):
420
"""Mime message of type text/plain is ignored but shows warning."""
422
message = MIMEBase("text", "plain")
423
message.set_payload("Just text")
424
ci.datasource = FakeDataSource(message.as_string().encode())
426
with mock.patch('cloudinit.util.write_file') as mockobj:
427
log_file = self.capture_log(logging.WARNING)
431
"Unhandled unknown content-type (text/plain)",
433
mockobj.assert_called_once_with(
434
ci.paths.get_ipath("cloud_config"), "", 0o600)
436
def test_shellscript(self):
437
"""Raw text starting #!/bin/sh is treated as script."""
439
script = "#!/bin/sh\necho hello\n"
440
ci.datasource = FakeDataSource(script)
442
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
444
with mock.patch('cloudinit.util.write_file') as mockobj:
445
log_file = self.capture_log(logging.WARNING)
448
self.assertEqual("", log_file.getvalue())
450
mockobj.assert_has_calls([
451
mock.call(outpath, script, 0o700),
452
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
454
def test_mime_text_x_shellscript(self):
455
"""Mime message of type text/x-shellscript is treated as script."""
457
script = "#!/bin/sh\necho hello\n"
458
message = MIMEBase("text", "x-shellscript")
459
message.set_payload(script)
460
ci.datasource = FakeDataSource(message.as_string())
462
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
464
with mock.patch('cloudinit.util.write_file') as mockobj:
465
log_file = self.capture_log(logging.WARNING)
468
self.assertEqual("", log_file.getvalue())
470
mockobj.assert_has_calls([
471
mock.call(outpath, script, 0o700),
472
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
474
def test_mime_text_plain_shell(self):
475
"""Mime type text/plain starting #!/bin/sh is treated as script."""
477
script = "#!/bin/sh\necho hello\n"
478
message = MIMEBase("text", "plain")
479
message.set_payload(script)
480
ci.datasource = FakeDataSource(message.as_string())
482
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
484
with mock.patch('cloudinit.util.write_file') as mockobj:
485
log_file = self.capture_log(logging.WARNING)
488
self.assertEqual("", log_file.getvalue())
490
mockobj.assert_has_calls([
491
mock.call(outpath, script, 0o700),
492
mock.call(ci.paths.get_ipath("cloud_config"), "", 0o600)])
494
def test_mime_application_octet_stream(self):
495
"""Mime type application/octet-stream is ignored but shows warning."""
497
message = MIMEBase("application", "octet-stream")
498
message.set_payload(b'\xbf\xe6\xb2\xc3\xd3\xba\x13\xa4\xd8\xa1\xcc')
499
encoders.encode_base64(message)
500
ci.datasource = FakeDataSource(message.as_string().encode())
502
with mock.patch('cloudinit.util.write_file') as mockobj:
503
log_file = self.capture_log(logging.WARNING)
507
"Unhandled unknown content-type (application/octet-stream)",
509
mockobj.assert_called_once_with(
510
ci.paths.get_ipath("cloud_config"), "", 0o600)
512
def test_cloud_config_archive(self):
513
non_decodable = b'\x11\xc9\xb4gTH\xee\x12'
514
data = [{'content': '#cloud-config\npassword: gocubs\n'},
515
{'content': '#cloud-config\nlocale: chicago\n'},
516
{'content': non_decodable}]
517
message = b'#cloud-config-archive\n' + util.yaml_dumps(data).encode()
520
ci.datasource = FakeDataSource(message)
524
def fsstore(filename, content, mode=0o0644, omode="wb"):
525
fs[filename] = content
527
# consuming the user-data provided should write 'cloud_config' file
528
# which will have our yaml in it.
529
with mock.patch('cloudinit.util.write_file') as mockobj:
530
mockobj.side_effect = fsstore
534
cfg = util.load_yaml(fs[ci.paths.get_ipath("cloud_config")])
535
self.assertEqual(cfg.get('password'), 'gocubs')
536
self.assertEqual(cfg.get('locale'), 'chicago')
539
class TestUDProcess(helpers.ResourceUsingTestCase):
541
def test_bytes_in_userdata(self):
542
msg = b'#cloud-config\napt_update: True\n'
543
ud_proc = ud.UserDataProcessor(self.getCloudPaths())
544
message = ud_proc.process(msg)
545
self.assertTrue(count_messages(message) == 1)
547
def test_string_in_userdata(self):
548
msg = '#cloud-config\napt_update: True\n'
550
ud_proc = ud.UserDataProcessor(self.getCloudPaths())
551
message = ud_proc.process(msg)
552
self.assertTrue(count_messages(message) == 1)
554
def test_compressed_in_userdata(self):
555
msg = gzip_text('#cloud-config\napt_update: True\n')
557
ud_proc = ud.UserDataProcessor(self.getCloudPaths())
558
message = ud_proc.process(msg)
559
self.assertTrue(count_messages(message) == 1)
562
class TestConvertString(helpers.TestCase):
563
def test_handles_binary_non_utf8_decodable(self):
565
msg = ud.convert_string(blob)
566
self.assertEqual(blob, msg.get_payload(decode=True))
568
def test_handles_binary_utf8_decodable(self):
570
msg = ud.convert_string(blob)
571
self.assertEqual(blob, msg.get_payload(decode=True))
573
def test_handle_headers(self):
575
msg = ud.convert_string(text)
576
self.assertEqual(text, msg.get_payload(decode=False))