2
"""Tests for the autoload-credentials command."""
4
from __future__ import print_function
7
from collections import (
11
from contextlib import contextmanager
18
from textwrap import dedent
22
from deploy_stack import BootstrapManager
27
add_basic_testing_arguments,
37
log = logging.getLogger("assess_autoload_credentials")
40
# Store details for querying the interactive command.
41
# cloud_listing: String response for choosing credential to save
42
# save_name: String response in which to save the credential under.
43
ExpectAnswers = namedtuple('ExpectAnswers', ['cloud_listing', 'save_name'])
45
# Store details for setting up a clouds credentials as well as what to compare
47
# env_var_changes: dict
48
# expected_details: dict
49
# expect_answers: ExpectAnswers object
50
CloudDetails = namedtuple(
52
['env_var_changes', 'expected_details', 'expect_answers']
56
class CredentialIdCounter:
57
_counter = defaultdict(itertools.count)
60
def id(cls, provider_name):
61
return cls._counter[provider_name].next()
64
def assess_autoload_credentials(args):
66
'ec2': [('AWS using environment variables', aws_envvar_test_details),
67
('AWS using credentials file', aws_directory_test_details)],
69
[('OS using environment variables', openstack_envvar_test_details),
70
('OS using credentials file', openstack_directory_test_details)],
71
'gce': [('GCE using envvar with credentials file',
72
gce_envvar_with_file_test_details),
73
('GCE using credentials file',
74
gce_file_test_details)],
77
client = client_from_config(args.env, args.juju_bin, False)
78
client.env.load_yaml()
79
real_credential_details = client_credentials_to_details(client)
80
provider = client.env.config['type']
82
for scenario_name, scenario_setup in test_scenarios[provider]:
83
log.info('* Starting test scenario: {}'.format(scenario_name))
84
ensure_autoload_credentials_stores_details(client, scenario_setup)
86
for scenario_name, scenario_setup in test_scenarios[provider]:
88
'* Starting [overwrite] test, scenario: {}'.format(scenario_name))
89
ensure_autoload_credentials_overwrite_existing(
90
client, scenario_setup)
92
bs_manager = BootstrapManager.from_args(args)
93
autoload_and_bootstrap(bs_manager, args.upload_tools,
94
real_credential_details, scenario_setup)
97
def client_credentials_to_details(client):
98
"""Convert the credentials in the client to details."""
99
provider = client.env.config['type']
100
log.info("provider: {}".format(provider))
101
cloud_name = client.env.get_cloud()
102
log.info("cloud_name: {}".format(cloud_name))
103
credentials = client.env.get_cloud_credentials()
104
if 'ec2' == provider:
105
return {'secret_key': credentials['secret-key'],
106
'access_key': credentials['access-key'],
108
if 'gce' == provider:
109
return {'client_id': credentials['client-id'],
110
'client_email': credentials['client-email'],
111
'private_key': credentials['private-key'],
113
if 'openstack' == provider:
114
os_cloud = client.env.clouds['clouds'][cloud_name]
115
return {'os_tenant_name': credentials['tenant-name'],
116
'os_password': credentials['password'],
117
'os_region_name': client.env.config['region'],
118
'os_auth_url': os_cloud['endpoint'],
123
def begin_autoload_test(client_base):
124
client = client_base.clone(env=client_base.env.clone())
125
with temp_dir() as tmp_dir:
126
tmp_juju_home = tempfile.mkdtemp(dir=tmp_dir)
127
tmp_scratch_dir = tempfile.mkdtemp(dir=tmp_dir)
128
client.env.juju_home = tmp_juju_home
129
client.env.load_yaml()
130
yield client, tmp_scratch_dir
133
def ensure_autoload_credentials_stores_details(client_base, cloud_details_fn):
134
"""Test covering loading and storing credentials using autoload-credentials
136
:param client: EnvJujuClient object to use for the test run.
137
:param cloud_details_fn: A callable that takes the 3 arguments `user`
138
string, `tmp_dir` path string and client EnvJujuClient and will returns a
139
`CloudDetails` object used to setup creation of credential details &
140
comparison of the result.
143
user = 'testing-user'
144
with begin_autoload_test(client_base) as (client, tmp_scratch_dir):
145
cloud_details = cloud_details_fn(user, tmp_scratch_dir, client)
147
run_autoload_credentials(
149
cloud_details.env_var_changes,
150
cloud_details.expect_answers)
152
client.env.load_yaml()
154
assert_credentials_contains_expected_results(
155
client.env.credentials,
156
cloud_details.expected_details)
159
def ensure_autoload_credentials_overwrite_existing(client_base,
161
"""Storing credentials using autoload-credentials must overwrite existing.
163
:param client: EnvJujuClient object to use for the test run.
164
:param cloud_details_fn: A callable that takes the 3 arguments `user`
165
string, `tmp_dir` path string and client EnvJujuClient and will returns a
166
`CloudDetails` object used to setup creation of credential details &
167
comparison of the result.
170
user = 'testing-user'
171
with begin_autoload_test(client_base) as (client, tmp_scratch_dir):
172
initial_details = cloud_details_fn(
173
user, tmp_scratch_dir, client)
175
run_autoload_credentials(
177
initial_details.env_var_changes,
178
initial_details.expect_answers)
180
# Now run again with a second lot of details.
181
overwrite_details = cloud_details_fn(user, tmp_scratch_dir, client)
184
overwrite_details.expected_details ==
185
initial_details.expected_details):
187
'Attempting to use identical values for overwriting')
189
run_autoload_credentials(
191
overwrite_details.env_var_changes,
192
overwrite_details.expect_answers)
194
client.env.load_yaml()
196
assert_credentials_contains_expected_results(
197
client.env.credentials,
198
overwrite_details.expected_details)
201
def autoload_and_bootstrap(bs_manager, upload_tools, real_credentials,
203
"""Ensure we can bootstrap after autoloading credentials."""
204
with begin_autoload_test(bs_manager.client) as (client_na,
206
# Do not overwrite real JUJU_DATA/JUJU_HOME/cloud-city dir.
207
bs_manager.client.env.juju_home = client_na.env.juju_home
208
bs_manager.tear_down_client.env.juju_home = client_na.env.juju_home
209
# Openstack needs the real username.
210
user = client_na.env.config.get('username', 'testing-user')
211
cloud_details = cloud_details_fn(
212
user, tmp_scratch_dir, bs_manager.client, real_credentials)
213
# Reset the client's credentials before autoload.
214
bs_manager.client.env.credentials = {}
216
with bs_manager.top_context() as machines:
217
with bs_manager.bootstrap_context(
219
omit_config=bs_manager.client.bootstrap_replaces):
220
run_autoload_credentials(
222
cloud_details.env_var_changes,
223
cloud_details.expect_answers)
224
bs_manager.client.env.load_yaml()
226
bs_manager.client.bootstrap(
227
upload_tools=upload_tools,
228
bootstrap_series=bs_manager.series,
230
bs_manager.client.kill_controller()
233
def assert_credentials_contains_expected_results(credentials, expected):
234
if credentials != expected:
236
'Actual credentials do not match expected credentials.\n'
237
'Expected: {expected}\nGot: {got}\n'.format(
240
log.info('PASS: credentials == expected')
243
def run_autoload_credentials(client, envvars, answers):
244
"""Execute the command 'juju autoload-credentials'.
246
Simple interaction, calls juju autoload-credentials selects the first
247
option and then quits.
249
:param client: EnvJujuClient from which juju will be called.
250
:param envvars: Dictionary containing environment variables to be used
252
:param answers: ExpectAnswers object containing answers for the interactive
256
process = client.expect(
257
'autoload-credentials', extra_env=envvars, include_e=False)
258
process.expect('.*1. {} \(.*\).*'.format(answers.cloud_listing))
259
process.sendline('1')
262
'(Select the cloud it belongs to|Enter cloud to which the credential)'
264
process.sendline(answers.save_name)
266
'Saved {listing_display} to cloud {save_name}'.format(
267
listing_display=answers.cloud_listing,
268
save_name=answers.save_name))
269
process.sendline('q')
270
process.expect(pexpect.EOF)
272
if process.isalive():
273
log.debug('juju process is still running: {}'.format(str(process)))
274
process.terminate(force=True)
275
raise AssertionError('juju process failed to terminate')
278
def aws_envvar_test_details(user, tmp_dir, client, credential_details=None):
279
"""client is un-used for AWS"""
280
credential_details = credential_details or aws_credential_dict_generator()
281
access_key = credential_details['access_key']
282
secret_key = credential_details['secret_key']
283
env_var_changes = get_aws_environment(user, access_key, secret_key)
285
answers = ExpectAnswers(
286
cloud_listing='aws credential "{}"'.format(user),
289
expected_details = get_aws_expected_details_dict(
290
user, access_key, secret_key)
292
return CloudDetails(env_var_changes, expected_details, answers)
295
def aws_directory_test_details(user, tmp_dir, client, credential_details=None):
296
"""client is un-used for AWS"""
297
credential_details = credential_details or aws_credential_dict_generator()
298
access_key = credential_details['access_key']
299
secret_key = credential_details['secret_key']
300
expected_details = get_aws_expected_details_dict(
301
user, access_key, secret_key)
303
write_aws_config_file(user, tmp_dir, access_key, secret_key)
305
answers = ExpectAnswers(
306
cloud_listing='aws credential "{}"'.format(user),
309
env_var_changes = dict(HOME=tmp_dir)
311
return CloudDetails(env_var_changes, expected_details, answers)
314
def get_aws_expected_details_dict(cloud_name, access_key, secret_key):
315
# Build credentials yaml file-like datastructure.
320
'auth-type': 'access-key',
321
'access-key': access_key,
322
'secret-key': secret_key,
329
def get_aws_environment(user, access_key, secret_key):
330
"""Return a dictionary containing keys suitable for AWS env vars."""
333
AWS_ACCESS_KEY_ID=access_key,
334
AWS_SECRET_ACCESS_KEY=secret_key)
337
def write_aws_config_file(user, tmp_dir, access_key, secret_key):
338
"""Write aws credentials file to tmp_dir
340
:return: String path of created credentials file.
343
config_dir = os.path.join(tmp_dir, '.aws')
344
config_file = os.path.join(config_dir, 'credentials')
345
ensure_dir(config_dir)
347
config_contents = dedent("""\
350
aws_secret_access_key={}
351
""".format(user, access_key, secret_key))
353
with open(config_file, 'w') as f:
354
f.write(config_contents)
359
def aws_credential_dict_generator():
360
call_id = CredentialIdCounter.id('aws')
361
creds = 'aws-credentials-{}'.format(call_id)
367
def openstack_envvar_test_details(
368
user, tmp_dir, client, credential_details=None):
369
if credential_details is None:
370
region = client.env.config['region']
372
'Generating credential_details for openstack {}'.format(region))
373
credential_details = openstack_credential_dict_generator(region)
375
expected_details, answers = setup_basic_openstack_test_details(
376
client, user, credential_details)
377
env_var_changes = get_openstack_envvar_changes(user, credential_details)
378
return CloudDetails(env_var_changes, expected_details, answers)
381
def get_openstack_envvar_changes(user, credential_details):
385
OS_PASSWORD=credential_details['os_password'],
386
OS_TENANT_NAME=credential_details['os_tenant_name'],
387
OS_AUTH_URL=credential_details['os_auth_url'],
388
OS_REGION_NAME=credential_details['os_region_name'],
392
def openstack_directory_test_details(user, tmp_dir, client,
393
credential_details=None):
394
if credential_details is None:
395
region = client.env.config['region']
397
'Generating credential_details for openstack {}'.format(region))
398
credential_details = openstack_credential_dict_generator(region)
400
expected_details, answers = setup_basic_openstack_test_details(
401
client, user, credential_details)
402
write_openstack_config_file(tmp_dir, user, credential_details)
403
env_var_changes = dict(HOME=tmp_dir)
405
return CloudDetails(env_var_changes, expected_details, answers)
408
def setup_basic_openstack_test_details(client, user, credential_details):
409
ensure_openstack_personal_cloud_exists(client)
410
expected_details = get_openstack_expected_details_dict(
411
user, credential_details)
412
answers = ExpectAnswers(
413
cloud_listing='openstack region ".*" project "{}" user "{}"'.format(
414
credential_details['os_tenant_name'],
416
save_name='testing-openstack')
418
return expected_details, answers
421
def write_openstack_config_file(tmp_dir, user, credential_details):
422
credentials_file = os.path.join(tmp_dir, '.novarc')
423
with open(credentials_file, 'w') as f:
424
credentials = dedent("""\
425
export OS_USERNAME={user}
426
export OS_PASSWORD={password}
427
export OS_TENANT_NAME={tenant_name}
428
export OS_AUTH_URL={auth_url}
429
export OS_REGION_NAME={region}
432
password=credential_details['os_password'],
433
tenant_name=credential_details['os_tenant_name'],
434
auth_url=credential_details['os_auth_url'],
435
region=credential_details['os_region_name'],
438
return credentials_file
441
def ensure_openstack_personal_cloud_exists(client):
442
juju_home = client.env.juju_home
443
if not juju_home.startswith('/tmp'):
444
raise ValueError('JUJU_HOME is wrongly set to: {}'.format(juju_home))
445
if client.env.clouds['clouds']:
446
cloud_name = client.env.get_cloud()
447
regions = client.env.clouds['clouds'][cloud_name]['regions']
449
regions = {'region1': {}}
451
'testing-openstack': {
453
'auth-types': ['userpass'],
454
'endpoint': client.env.config['auth-url'],
458
client.env.clouds['clouds'] = os_cloud
459
client.env.dump_yaml(juju_home, config=None)
462
def get_openstack_expected_details_dict(user, credential_details):
465
'testing-openstack': {
466
'default-region': credential_details['os_region_name'],
468
'auth-type': 'userpass',
470
'password': credential_details['os_password'],
471
'tenant-name': credential_details['os_tenant_name'],
479
def openstack_credential_dict_generator(region):
480
call_id = CredentialIdCounter.id('openstack')
481
creds = 'openstack-credentials-{}'.format(call_id)
483
os_tenant_name=creds,
485
os_auth_url='https://keystone.example.com:443/v2.0/',
486
os_region_name=region)
489
def gce_envvar_with_file_test_details(user, tmp_dir, client,
490
credential_details=None):
491
if credential_details is None:
492
credential_details = gce_credential_dict_generator()
493
credentials_path = write_gce_config_file(tmp_dir, credential_details)
495
answers = ExpectAnswers(
496
cloud_listing='google credential "{}"'.format(
497
credential_details['client_email']),
500
expected_details = get_gce_expected_details_dict(user, credentials_path)
502
env_var_changes = dict(
504
GOOGLE_APPLICATION_CREDENTIALS=credentials_path,
507
return CloudDetails(env_var_changes, expected_details, answers)
510
def gce_file_test_details(user, tmp_dir, client, credential_details=None):
511
if credential_details is None:
512
credential_details = gce_credential_dict_generator()
514
home_path, credentials_path = write_gce_home_config_file(
515
tmp_dir, credential_details)
517
answers = ExpectAnswers(
518
cloud_listing='google credential "{}"'.format(
519
credential_details['client_email']),
522
expected_details = get_gce_expected_details_dict(user, credentials_path)
524
env_var_changes = dict(USER=user, HOME=home_path)
526
return CloudDetails(env_var_changes, expected_details, answers)
529
def write_gce_config_file(tmp_dir, credential_details, filename=None):
532
type='service_account',
533
client_id=credential_details['client_id'],
534
client_email=credential_details['client_email'],
535
private_key=credential_details['private_key'])
537
# Generate a unique filename if none provided as this is stored and used in
539
filename = filename or 'gce-file-config-{}.json'.format(
540
CredentialIdCounter.id('gce-fileconfig'))
541
credential_file = os.path.join(tmp_dir, filename)
542
with open(credential_file, 'w') as f:
543
json.dump(details, f)
545
return credential_file
548
def write_gce_home_config_file(tmp_dir, credential_details):
549
"""Returns a tuple contining a new HOME path and credential file path."""
550
# Add a unique string for home dir so each file path is unique within the
551
# stored credentials file.
552
home_dir = os.path.join(tmp_dir, 'gce-homedir-{}'.format(
553
CredentialIdCounter.id('gce-homedir')))
554
credential_path = os.path.join(home_dir, '.config', 'gcloud')
555
os.makedirs(credential_path)
557
written_credentials_path = write_gce_config_file(
560
'application_default_credentials.json')
562
return home_dir, written_credentials_path
565
def get_gce_expected_details_dict(user, credentials_path):
570
'auth-type': 'jsonfile',
571
'file': credentials_path,
578
def gce_credential_dict_generator():
579
call_id = CredentialIdCounter.id('gce')
580
creds = 'gce-credentials-{}'.format(call_id)
583
client_email='{}@example.com'.format(creds),
588
def parse_args(argv):
589
"""Parse all arguments."""
590
parser = argparse.ArgumentParser(
591
description="Test autoload-credentials command.")
592
add_basic_testing_arguments(parser)
593
return parser.parse_args(argv)
597
args = parse_args(argv)
598
configure_logging(args.verbose)
600
assess_autoload_credentials(args)
604
if __name__ == '__main__':