136
137
(mod, 'wait_for_files', _wait_for_files),
137
138
(mod, 'pubkeys_from_crt_files',
138
139
_pubkeys_from_crt_files),
139
(mod, 'iid_from_shared_config',
140
_iid_from_shared_config)])
140
(mod, 'get_instance_id', _get_instance_id)])
142
142
dsrc = mod.DataSourceAzureNet(
143
143
data.get('sys_cfg', {}), distro=None, paths=self.paths)
279
280
self.assertEqual(dsrc.userdata_raw, mydata)
281
282
def test_no_datasource_expected(self):
282
#no source should be found if no seed_dir and no devs
283
# no source should be found if no seed_dir and no devs
284
285
dsrc = self._get_ds({})
285
286
ret = dsrc.get_data()
286
287
self.assertFalse(ret)
287
288
self.assertFalse('agent_invoked' in data)
289
def test_cfg_has_pubkeys(self):
290
odata = {'HostName': "myhost", 'UserName': "myuser"}
291
mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
292
pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
293
data = {'ovfcontent': construct_valid_ovf_env(data=odata,
296
dsrc = self._get_ds(data)
297
ret = dsrc.get_data()
299
for mypk in mypklist:
300
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
290
def test_cfg_has_pubkeys_fingerprint(self):
291
odata = {'HostName': "myhost", 'UserName': "myuser"}
292
mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
293
pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
294
data = {'ovfcontent': construct_valid_ovf_env(data=odata,
297
dsrc = self._get_ds(data)
298
ret = dsrc.get_data()
300
for mypk in mypklist:
301
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
302
self.assertIn('pubkey_from', dsrc.metadata['public-keys'][-1])
304
def test_cfg_has_pubkeys_value(self):
305
# make sure that provided key is used over fingerprint
306
odata = {'HostName': "myhost", 'UserName': "myuser"}
307
mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': 'value1'}]
308
pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
309
data = {'ovfcontent': construct_valid_ovf_env(data=odata,
312
dsrc = self._get_ds(data)
313
ret = dsrc.get_data()
316
for mypk in mypklist:
317
self.assertIn(mypk, dsrc.cfg['_pubkeys'])
318
self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
320
def test_cfg_has_no_fingerprint_has_value(self):
321
# test value is used when fingerprint not provided
322
odata = {'HostName': "myhost", 'UserName': "myuser"}
323
mypklist = [{'fingerprint': None, 'path': 'path1', 'value': 'value1'}]
324
pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
325
data = {'ovfcontent': construct_valid_ovf_env(data=odata,
328
dsrc = self._get_ds(data)
329
ret = dsrc.get_data()
332
for mypk in mypklist:
333
self.assertIn(mypk['value'], dsrc.metadata['public-keys'])
302
335
def test_disabled_bounce(self):
384
417
self.assertTrue(os.path.exists(ovf_env_path))
385
418
self.xml_equals(xml, load_file(ovf_env_path))
387
def test_existing_ovf_same(self):
388
# waagent/SharedConfig left alone if found ovf-env.xml same as cached
389
odata = {'UserData': base64.b64encode("SOMEUSERDATA")}
390
data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
392
populate_dir(self.waagent_d,
393
{'ovf-env.xml': data['ovfcontent'],
394
'otherfile': 'otherfile-content',
395
'SharedConfig.xml': 'mysharedconfig'})
397
dsrc = self._get_ds(data)
398
ret = dsrc.get_data()
400
self.assertTrue(os.path.exists(
401
os.path.join(self.waagent_d, 'ovf-env.xml')))
402
self.assertTrue(os.path.exists(
403
os.path.join(self.waagent_d, 'otherfile')))
404
self.assertTrue(os.path.exists(
405
os.path.join(self.waagent_d, 'SharedConfig.xml')))
407
def test_existing_ovf_diff(self):
408
# waagent/SharedConfig must be removed if ovfenv is found elsewhere
410
# 'get_data' should remove SharedConfig.xml in /var/lib/waagent
411
# if ovf-env.xml differs.
412
cached_ovfenv = construct_valid_ovf_env(
413
{'userdata': base64.b64encode("FOO_USERDATA")})
414
new_ovfenv = construct_valid_ovf_env(
415
{'userdata': base64.b64encode("NEW_USERDATA")})
417
populate_dir(self.waagent_d,
418
{'ovf-env.xml': cached_ovfenv,
419
'SharedConfig.xml': "mysharedconfigxml",
420
'otherfile': 'otherfilecontent'})
422
dsrc = self._get_ds({'ovfcontent': new_ovfenv})
423
ret = dsrc.get_data()
425
self.assertEqual(dsrc.userdata_raw, "NEW_USERDATA")
426
self.assertTrue(os.path.exists(
427
os.path.join(self.waagent_d, 'otherfile')))
429
os.path.exists(os.path.join(self.waagent_d, 'SharedConfig.xml')))
431
os.path.exists(os.path.join(self.waagent_d, 'ovf-env.xml')))
432
new_xml = load_file(os.path.join(self.waagent_d, 'ovf-env.xml'))
433
self.xml_equals(new_ovfenv, new_xml)
435
421
class TestReadAzureOvf(MockerTestCase):
436
422
def test_invalid_xml_raises_non_azure_ds(self):
437
423
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
438
424
self.assertRaises(DataSourceAzure.BrokenAzureDataSource,
439
DataSourceAzure.read_azure_ovf, invalid_xml)
425
DataSourceAzure.read_azure_ovf, invalid_xml)
441
427
def test_load_with_pubkeys(self):
442
mypklist = [{'fingerprint': 'fp1', 'path': 'path1'}]
443
pubkeys = [(x['fingerprint'], x['path']) for x in mypklist]
428
mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
429
pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
444
430
content = construct_valid_ovf_env(pubkeys=pubkeys)
445
(_md, _ud, cfg) = DataSourceAzure.read_azure_ovf(content)
431
(_, _, cfg) = DataSourceAzure.read_azure_ovf(content)
446
432
for mypk in mypklist:
447
433
self.assertIn(mypk, cfg['_pubkeys'])
450
class TestReadAzureSharedConfig(MockerTestCase):
451
def test_valid_content(self):
452
xml = """<?xml version="1.0" encoding="utf-8"?>
454
<Deployment name="MY_INSTANCE_ID">
455
<Service name="myservice"/>
456
<ServiceInstance name="INSTANCE_ID.0" guid="{abcd-uuid}" />
458
<Incarnation number="1"/>
460
ret = DataSourceAzure.iid_from_shared_config_content(xml)
461
self.assertEqual("MY_INSTANCE_ID", ret)
464
436
def apply_patches(patches):
466
438
for (ref, name, replace) in patches: