50
50
self.m.StubOutWithMock(kc_v3, "Client")
51
51
self.m.StubOutWithMock(ks_auth_v3, 'Password')
52
52
self.m.StubOutWithMock(ks_token_endpoint, 'Token')
53
self.m.StubOutWithMock(context, '_AccessInfoPlugin')
53
self.m.StubOutWithMock(ks_auth_access, 'AccessInfoPlugin')
55
55
dummy_url = 'http://server.test:5000/v2.0'
56
56
cfg.CONF.set_override('auth_uri', dummy_url,
69
69
def _clear_domain_override(self):
70
70
cfg.CONF.clear_override('stack_user_domain_id')
72
def _stub_admin_client(self, auth_ok=True):
74
auth_url='http://server.test:5000/v3',
77
endpoint='http://server.test:5000/v3',
80
password='verybadpass',
81
project_name='service',
82
username='heat').AndReturn(self.mock_admin_client)
83
self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
72
def _stub_admin_auth(self, auth_ok=True):
73
mock_ks_auth = self.m.CreateMockAnything()
75
a = mock_ks_auth.get_user_id(mox.IsA(ks_session.Session))
85
self.mock_admin_client.authenticate().AndReturn(auth_ok)
86
self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
87
self.mock_admin_client.auth_ref.user_id = '1234'
89
self.mock_admin_client.authenticate().AndRaise(
90
kc_exception.Unauthorized)
92
def _stub_domain_admin_client(self, auth_ok=True):
94
auth_url='http://server.test:5000/v3',
97
endpoint='http://server.test:5000/v3',
100
password='adminsecret',
101
user_domain_id='adomain123',
102
username='adminuser123').AndReturn(self.mock_admin_client)
79
a.AndRaise(kc_exception.Unauthorized)
81
m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
82
password='verybadpass',
83
user_domain_id='default',
85
m.AndReturn(mock_ks_auth)
87
def _stub_domain_admin_client(self, domain_id=None):
88
mock_ks_auth = self.m.CreateMockAnything()
89
mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn('tok')
91
m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
92
password='adminsecret',
93
domain_id='adomain123',
95
user_domain_id='adomain123',
96
user_domain_name=None,
97
username='adminuser123')
98
m.AndReturn(mock_ks_auth)
100
n = kc_v3.Client(session=mox.IsA(ks_session.Session),
102
n.AndReturn(self.mock_admin_client)
103
104
self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
104
self.mock_admin_client.authenticate(
105
domain_id='adomain123').AndReturn(auth_ok)
107
self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
108
self.mock_admin_client.auth_ref.user_id = '1234'
109
self.mock_admin_client.auth_ref.domain_id = 'adomain123'
111
106
def _stubs_v3(self, method='token', trust_scoped=True,
112
user_id='trustor_user_id', auth_ref=None, client=True,
107
user_id=None, auth_ref=None, client=True, project_id=None,
108
stub_trust_context=False):
114
109
mock_auth_ref = self.m.CreateMockAnything()
115
110
mock_ks_auth = self.m.CreateMockAnything()
118
113
p = ks_token_endpoint.Token(token='abcd1234',
119
114
endpoint='http://server.test:5000/v3')
120
115
elif method == 'auth_ref':
121
p = context._AccessInfoPlugin('http://server.test:5000/v3',
122
mox.IsA(ks_access.AccessInfo))
116
p = ks_auth_access.AccessInfoPlugin(
117
auth_url='http://server.test:5000/v3',
118
auth_ref=mox.IsA(ks_access.AccessInfo))
124
120
elif method == 'password':
125
121
p = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
126
122
username='test_username',
127
123
password='password',
128
project_id='test_tenant_id',
124
project_id=project_id or 'test_tenant_id',
129
125
user_domain_id='default')
131
127
elif method == 'trust':
146
143
auth=mock_ks_auth)
147
144
c.AndReturn(self.mock_ks_v3_client)
149
for x in xrange(0, times):
150
m = mock_ks_auth.get_access(mox.IsA(ks_session.Session))
151
m.AndReturn(mock_auth_ref)
146
if stub_trust_context:
147
m = mock_ks_auth.get_user_id(mox.IsA(ks_session.Session))
150
m = mock_ks_auth.get_project_id(mox.IsA(ks_session.Session))
151
m.AndReturn(project_id)
153
m = mock_ks_auth.get_access(mox.IsA(ks_session.Session))
154
m.AndReturn(mock_auth_ref)
153
156
return mock_ks_auth, mock_auth_ref
548
555
"""Test create_trust_context when creating a trust."""
550
self._stub_admin_client()
552
mock_auth, mock_auth_ref = self._stubs_v3(times=2)
557
self._stub_admin_auth()
559
mock_auth, mock_auth_ref = self._stubs_v3(user_id='5678',
561
stub_trust_context=True)
553
563
cfg.CONF.set_override('deferred_auth_method', 'trusts')
554
564
cfg.CONF.set_override('trusts_delegated_roles', ['heat_stack_owner'])
556
mock_auth_ref.user_id = '5678'
557
mock_auth_ref.project_id = '42'
559
566
self.mock_ks_v3_client.trusts = self.m.CreateMockAnything()
560
567
self.mock_ks_v3_client.trusts.create(
561
568
trustor_user='5678',
602
609
'"stack_domain_admin_password"')
603
610
self.assertIn(exp_msg, six.text_type(err))
605
def test_init_admin_client(self):
607
"""Test the admin_client property."""
609
self._stub_admin_client()
612
ctx = utils.dummy_context()
616
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
617
self.assertEqual(self.mock_admin_client, heat_ks_client.admin_client)
618
self.assertEqual(self.mock_admin_client, heat_ks_client._admin_client)
620
def test_init_admin_client_denied(self):
622
"""Test the admin_client property, auth failure path."""
624
self._stub_admin_client(auth_ok=False)
627
ctx = utils.dummy_context()
631
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
633
# Define wrapper for property or the property raises the exception
634
# outside of the assertRaises which fails the test
635
def get_admin_client():
636
heat_ks_client.admin_client
638
self.assertRaises(exception.AuthorizationFailure,
641
612
def test_trust_init(self):
643
614
"""Test consuming a trust when initializing."""
1367
1338
def test_stack_domain_user_token(self):
1368
1339
"""Test stack_domain_user_token function."""
1369
dummysession = self.m.CreateMockAnything()
1370
dummyresp = self.m.CreateMockAnything()
1371
dummyresp.headers = {'X-Subject-Token': 'dummytoken'}
1372
dummysession.post('http://server.test:5000/v3/auth/tokens?nocatalog',
1373
authenticated=False,
1374
headers={'Accept': 'application/json'},
1375
json=mox.IgnoreArg()).AndReturn(dummyresp)
1376
self.m.StubOutWithMock(ks_session, 'Session')
1377
ks_session.Session.construct(mox.IsA(dict)).AndReturn(dummysession)
1340
dum_tok = 'dummytoken'
1341
mock_ks_auth = self.m.CreateMockAnything()
1342
mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn(dum_tok)
1344
m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
1346
project_id='aproject',
1348
include_catalog=False)
1349
m.AndReturn(mock_ks_auth)
1378
1351
self.m.ReplayAll()
1380
1353
ctx = utils.dummy_context()
1381
1354
ctx.trust_id = None
1382
1355
heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
1383
token = heat_ks_client.stack_domain_user_token(
1384
user_id='duser', project_id='aproject', password='apassw')
1385
self.assertEqual('dummytoken', token)
1356
token = heat_ks_client.stack_domain_user_token(user_id='duser',
1357
project_id='aproject',
1359
self.assertEqual(dum_tok, token)
1387
1361
def test_stack_domain_user_token_err_nodomain(self):
1388
1362
"""Test stack_domain_user_token error path."""
1489
1463
def _clear_domain_override(self):
1490
1464
cfg.CONF.clear_override('stack_user_domain_name')
1492
def _stub_domain_admin_client(self, auth_ok=True):
1494
auth_url='http://server.test:5000/v3',
1497
endpoint='http://server.test:5000/v3',
1500
password='adminsecret',
1501
user_domain_name='fake_domain_name',
1502
username='adminuser123').AndReturn(self.mock_admin_client)
1466
def _stub_domain_admin_client_domain_get(self):
1467
dummy_domain = self.m.CreateMockAnything()
1468
dummy_domain.id = 'adomain123'
1469
self.mock_ks_v3_client_domain_mngr.list(
1470
name='fake_domain_name').AndReturn([dummy_domain])
1472
def _stub_domain_admin_client(self, domain_id='adomain123'):
1473
mock_ks_auth = self.m.CreateMockAnything()
1474
mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn('tok')
1477
a = self.m.CreateMockAnything()
1478
a.domain_id = domain_id
1479
mock_ks_auth.get_access(mox.IsA(ks_session.Session)).AndReturn(a)
1481
m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
1482
password='adminsecret',
1484
domain_name='fake_domain_name',
1485
user_domain_id=None,
1486
user_domain_name='fake_domain_name',
1487
username='adminuser123')
1489
m.AndReturn(mock_ks_auth)
1491
n = kc_v3.Client(session=mox.IsA(ks_session.Session),
1493
n.AndReturn(self.mock_admin_client)
1503
1495
self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
1504
self.mock_admin_client.authenticate(
1505
domain_name='fake_domain_name').AndReturn(auth_ok)
1507
self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
1508
self.mock_admin_client.auth_ref.user_id = '1234'
1509
self.mock_admin_client.auth_ref.domain_id = 'adomain123'
1511
1497
def _stub_domain_user_pw_auth(self):
1512
1498
ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
1583
1569
def test_create_stack_domain_user(self):
1584
1570
p = super(KeystoneClientTestDomainName, self)
1585
1571
p.test_create_stack_domain_user()
1574
class HeatClientTest(KeystoneClientTest):
1575
"""Test cases for heat.common.config"""
1578
super(HeatClientTest, self).setUp()
1580
def test_init_auth_encryption_key_length(self):
1581
"""Test for length of the auth_encryption_length in config file"""
1582
cfg.CONF.set_override('auth_encryption_key', 'abcdefghijklma')
1583
err = self.assertRaises(exception.Error,
1584
config.startup_sanity_check)
1585
exp_msg = ('heat.conf misconfigured, auth_encryption_key '
1586
'length must be 16, 24 or 32')
1587
self.assertIn(exp_msg, six.text_type(err))