1
"""Tests for assess_autoload_credentials module."""
3
from argparse import Namespace
9
from textwrap import dedent
13
import assess_autoload_credentials as aac
14
from deploy_stack import BootstrapManager
19
from tests.test_jujupy import fake_juju_client
20
from utility import temp_dir
23
class TestParseArgs(TestCase):
25
def test_common_args(self):
26
args = aac.parse_args(['env', '/bin/juju'])
27
self.assertEqual('env', args.env)
28
self.assertEqual('/bin/juju', args.juju_bin)
31
fake_stdout = StringIO.StringIO()
32
with parse_error(self) as fake_stderr:
33
with patch('sys.stdout', fake_stdout):
34
aac.parse_args(['--help'])
35
self.assertEqual('', fake_stderr.getvalue())
37
'Test autoload-credentials command.', fake_stdout.getvalue())
39
def test_verbose_is_set_to_debug_when_passed_verbose(self):
40
args = aac.parse_args(['/bin/juju', '--verbose'])
41
self.assertEqual(logging.DEBUG, args.verbose)
43
def test_verbose_default_values(self):
45
juju_bin = '/bin/juju'
46
temp_env_name = 'functional-autoload-credentials'
47
with temp_dir() as log:
48
args = aac.parse_args([env, juju_bin, log, temp_env_name])
51
Namespace(agent_stream=None, agent_url=None, bootstrap_host=None,
52
debug=False, deadline=None, env='env',
53
juju_bin='/bin/juju', keep_env=False, logs=log,
54
machine=[], region=None, series=None,
55
temp_env_name='functional-autoload-credentials',
56
upload_tools=False, verbose=logging.INFO,
60
class TestCredentialIdCounter(TestCase):
63
# Make sure CredentialIdCounter is reset to initial values
64
aac.CredentialIdCounter._counter.clear()
66
def test_returns_zero_for_new_id(self):
67
self.assertEqual(aac.CredentialIdCounter.id('test'), 0)
69
def test_returns_iterations_for_same_id(self):
71
aac.CredentialIdCounter.id('test') for x in xrange(3)
73
self.assertEqual(generated_ids, [0, 1, 2])
75
def test_returns_new_ids_for_multiple_names(self):
76
self.assertEqual(aac.CredentialIdCounter.id('test'), 0)
77
self.assertEqual(aac.CredentialIdCounter.id('another_test'), 0)
78
self.assertEqual(aac.CredentialIdCounter.id('test'), 1)
79
self.assertEqual(aac.CredentialIdCounter.id('another_test'), 1)
80
self.assertEqual(aac.CredentialIdCounter.id('test'), 2)
83
class TestAWSHelpers(TestCase):
85
def test_credential_dict_generator_returns_different_details(self):
86
"""Each call must return unique details each time."""
87
first_details = aac.aws_credential_dict_generator()
88
second_details = aac.aws_credential_dict_generator()
90
self.assertNotEqual(first_details, second_details)
92
def test_get_aws_environment_supplies_all_keys(self):
93
access_key = 'access_key'
94
secret_key = 'secret_key'
97
env = aac.get_aws_environment(username, access_key, secret_key)
103
AWS_ACCESS_KEY_ID=access_key,
104
AWS_SECRET_ACCESS_KEY=secret_key))
106
def test_aws_envvar_test_details_returns_correct_expected_details(self):
107
access_key = 'test_access_key'
108
secret_key = 'test_secret_key'
110
cloud_details = aac.aws_envvar_test_details(
115
'access_key': access_key,
116
'secret_key': secret_key})
118
self.assertDictEqual(
119
cloud_details.expected_details, {
123
'auth-type': 'access-key',
124
'access-key': access_key,
125
'secret-key': secret_key,
131
def test_aws_envvar_test_details_returns_correct_envvar_settings(self):
132
access_key = 'test_access_key'
133
secret_key = 'test_secret_key'
135
cloud_details = aac.aws_envvar_test_details(
140
'access_key': access_key,
141
'secret_key': secret_key})
143
self.assertDictEqual(
144
cloud_details.env_var_changes,
147
AWS_ACCESS_KEY_ID=access_key,
148
AWS_SECRET_ACCESS_KEY=secret_key))
150
def test_aws_directory_test_details_returns_correct_expected_details(self):
151
access_key = 'test_access_key'
152
secret_key = 'test_secret_key'
154
with patch.object(aac, 'write_aws_config_file'):
155
cloud_details = aac.aws_directory_test_details(
160
'access_key': access_key, 'secret_key': secret_key})
162
self.assertDictEqual(
163
cloud_details.expected_details, {
167
'auth-type': 'access-key',
168
'access-key': access_key,
169
'secret-key': secret_key,
175
def test_aws_directory_test_details_returns_envvar_settings(self):
176
with patch.object(aac, 'write_aws_config_file'):
177
cloud_details = aac.aws_directory_test_details(
181
self.assertDictEqual(
182
cloud_details.env_var_changes,
183
dict(HOME='tmp_dir'))
185
def test_write_aws_config_file_writes_credentials_file(self):
186
"""Ensure the file created contains the correct details."""
187
user = 'different-user'
188
access_key = 'access_key'
189
secret_key = 'secret_key'
191
with temp_dir() as tmp_dir:
192
credentials_file = aac.write_aws_config_file(
193
user, tmp_dir, access_key, secret_key)
194
credentials = ConfigParser.ConfigParser()
195
with open(credentials_file, 'r') as f:
196
credentials.readfp(f)
199
('aws_access_key_id', access_key),
200
('aws_secret_access_key', secret_key)]
202
self.assertEqual(credentials.sections(), [user])
204
credentials.items(user), expected_items)
207
class TestOpenStackHelpers(TestCase):
209
def test_credential_dict_generator_returns_different_details(self):
210
"""Each call must return uniquie details each time."""
211
first_details = aac.openstack_credential_dict_generator('region1')
212
second_details = aac.openstack_credential_dict_generator('region1')
214
self.assertNotEqual(first_details, second_details)
216
def test_expected_details_dict_returns_correct_values(self):
217
os_username = 'username'
218
os_password = 'password'
219
os_tenant_name = 'tenant name'
221
os_region_name = 'region'
222
expected_details = aac.get_openstack_expected_details_dict(
224
'os_password': os_password,
225
'os_tenant_name': os_tenant_name,
226
'os_auth_url': os_auth_url,
227
'os_region_name': os_region_name,
228
'os_user_name': os_username,
233
'testing-openstack': {
234
'default-region': 'region',
236
'auth-type': 'userpass',
238
'password': os_password,
239
'tenant-name': os_tenant_name,
240
'username': os_username
246
def test_get_openstack_envvar_changes_returns_correct_values(self):
248
os_password = 'password'
249
os_tenant_name = 'tenant name'
251
os_region_name = 'region'
252
env_var_changes = aac.get_openstack_envvar_changes(
254
'os_password': os_password,
255
'os_tenant_name': os_tenant_name,
256
'os_auth_url': os_auth_url,
257
'os_region_name': os_region_name,
264
'OS_PASSWORD': os_password,
265
'OS_TENANT_NAME': os_tenant_name,
266
'OS_AUTH_URL': os_auth_url,
267
'OS_REGION_NAME': os_region_name,
270
def test_write_openstack_config_file_writes_credentials_file(self):
271
"""Ensure the file created contains the correct details."""
272
credential_details = dict(
273
os_tenant_name='tenant_name',
274
os_password='password',
276
os_region_name='region')
279
with temp_dir() as tmp_dir:
280
credentials_file = aac.write_openstack_config_file(
281
tmp_dir, user, credential_details)
282
with open(credentials_file, 'r') as f:
283
credential_contents = f.read()
285
expected = dedent("""\
286
export OS_USERNAME={user}
287
export OS_PASSWORD={password}
288
export OS_TENANT_NAME={tenant_name}
289
export OS_AUTH_URL={auth_url}
290
export OS_REGION_NAME={region}
293
password=credential_details['os_password'],
294
tenant_name=credential_details['os_tenant_name'],
295
auth_url=credential_details['os_auth_url'],
296
region=credential_details['os_region_name'],
299
self.assertEqual(credential_contents, expected)
302
class TestGCEHelpers(TestCase):
303
def test_get_gce_expected_details_dict_returns_correct_details(self):
305
cred_path = '/some/path'
307
aac.get_gce_expected_details_dict(user, cred_path),
312
'auth-type': 'jsonfile',
319
def test_gce_credential_dict_generator_returns_unique_details(self):
321
aac.gce_credential_dict_generator(),
322
aac.gce_credential_dict_generator())
324
def test_write_gce_config_file_creates_unique_credential_file(self):
326
client_id='client_id',
327
client_email='client_email',
328
private_key='private_key',
331
with patch.object(aac.CredentialIdCounter, 'id') as id_gen:
332
id_gen.return_value = 0
333
with temp_dir() as tmp_dir:
334
file_path = aac.write_gce_config_file(tmp_dir, credentials)
337
os.path.join(tmp_dir, 'gce-file-config-{}.json'.format(0)))
339
def test_write_gce_config_file_creates_named_credential_file(self):
341
client_id='client_id',
342
client_email='client_email',
343
private_key='private_key',
346
with temp_dir() as tmp_dir:
347
file_path = aac.write_gce_config_file(
348
tmp_dir, credentials, 'file_name')
349
self.assertEqual(file_path, os.path.join(tmp_dir, 'file_name'))
351
def test_credential_generator_returns_correct_formats(self):
352
"""Three items are needed, one of them must be an email address."""
353
with patch.object(aac.CredentialIdCounter, 'id') as id_gen:
354
id_gen.return_value = 0
355
details = aac.gce_credential_dict_generator()
357
self.assertEqual(details['private_key'], 'gce-credentials-0')
358
self.assertEqual(details['client_id'], 'gce-credentials-0')
360
details['client_email'], 'gce-credentials-0@example.com')
363
class TestAssertCredentialsContainsExpectedResults(TestCase):
365
def test_does_not_raise_when_credentials_match(self):
366
cred_actual = dict(key='value')
367
cred_expected = dict(key='value')
369
aac.assert_credentials_contains_expected_results(
370
cred_actual, cred_expected)
372
def test_raises_when_credentials_do_not_match(self):
373
cred_actual = dict(key='value')
374
cred_expected = dict(key='value', another_key='extra')
378
aac.assert_credentials_contains_expected_results,
383
def bogus_credentials():
384
"""return an client with unusable crednetials.
386
It uses an openstack config to match the fake_juju.AutoloadCredentials.
388
client = fake_juju_client()
389
client.env.config['type'] = 'openstack'
390
client.env.config['auth-url'] = 'url'
391
client.env.config['region'] = 'region'
392
client.env.credentials = {
393
'credentials': {'bogus': {}}}
397
class TestEnsureAutoloadCredentialsStoresDetails(TestCase):
399
def test_existing_credentials_openstack(self):
401
aac.ensure_autoload_credentials_stores_details(
402
bogus_credentials(), aac.openstack_envvar_test_details)
405
class TestEnsureAutoloadCredentialsOverwriteExisting(TestCase):
407
def test_overwrite_existing(self):
408
aac.ensure_autoload_credentials_overwrite_existing(
409
bogus_credentials(), aac.openstack_envvar_test_details)
412
class TestAutoloadAndBootstrap(TestCase):
414
def test_autoload_and_bootstrap(self):
416
def cloud_details_fn(user, tmp_dir, client, credential_details):
417
return aac.CloudDetails(credential_details, None, None)
419
client = fake_juju_client()
421
real_credential_details = {'cloud-username': 'user',
422
'cloud-password': 'password'
424
credential_file_details = {'credentials': {'cloud': {
425
'credentials': real_credential_details
428
def write_credentials(*args, **kwargs):
429
file_name = os.path.join(client.env.juju_home, 'credentials.yaml')
430
with open(file_name, 'w') as write_file:
431
yaml.safe_dump(credential_file_details, write_file)
433
def credential_check(*args, **kwargs):
434
self.assertEqual(client.env.credentials, credential_file_details)
436
with temp_dir() as log_dir:
437
bs_manager = BootstrapManager(
438
'env', client, client, None, [], None, None, None, None,
439
log_dir, False, True, True)
440
with patch('assess_autoload_credentials.run_autoload_credentials',
442
side_effect=write_credentials) as run_autoload_mock:
444
bs_manager.client, 'bootstrap',
446
side_effect=credential_check) as bootstrap_mock:
447
aac.autoload_and_bootstrap(bs_manager, upload_tools,
448
real_credential_details,
450
run_autoload_mock.assert_called_once_with(
451
client, real_credential_details, None)
452
bootstrap_mock.assert_called_once_with(False, bootstrap_series=None,
453
credential='testing-user')