~ubuntu-branches/ubuntu/wily/heat/wily

« back to all changes in this revision

Viewing changes to heat/tests/test_heatclient.py

  • Committer: Package Import Robot
  • Author(s): Corey Bryant, Corey Bryant, James Page
  • Date: 2015-07-07 17:06:19 UTC
  • mfrom: (1.1.26) (45.1.1 vivid-proposed)
  • Revision ID: package-import@ubuntu.com-20150707170619-hra2dbjpfofpou4s
Tags: 1:5.0.0~b1-0ubuntu1
[ Corey Bryant ]
* New upstream milestone for OpenStack Liberty:
  - d/control: Align (build-)depends with upstream.
  - d/p/fix-requirements.patch: Rebased.
  - d/p/sudoers_patch.patch: Rebased.

[ James Page ]
* d/s/options: Ignore any removal of egg-info data during package clean.
* d/control: Drop MySQL and PostgreSQL related BD's, not required for unit
  testing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
import uuid
16
16
 
17
17
from keystoneclient import access as ks_access
 
18
from keystoneclient.auth.identity import access as ks_auth_access
18
19
from keystoneclient.auth.identity import v3 as ks_auth_v3
19
20
from keystoneclient.auth import token_endpoint as ks_token_endpoint
20
21
import keystoneclient.exceptions as kc_exception
26
27
import six
27
28
 
28
29
from heat.common import config
29
 
from heat.common import context
30
30
from heat.common import exception
31
31
from heat.common import heat_keystoneclient
32
32
from heat.tests import common
50
50
        self.m.StubOutWithMock(kc_v3, "Client")
51
51
        self.m.StubOutWithMock(ks_auth_v3, 'Password')
52
52
        self.m.StubOutWithMock(ks_token_endpoint, 'Token')
53
 
        self.m.StubOutWithMock(context, '_AccessInfoPlugin')
 
53
        self.m.StubOutWithMock(ks_auth_access, 'AccessInfoPlugin')
54
54
 
55
55
        dummy_url = 'http://server.test:5000/v2.0'
56
56
        cfg.CONF.set_override('auth_uri', dummy_url,
69
69
    def _clear_domain_override(self):
70
70
        cfg.CONF.clear_override('stack_user_domain_id')
71
71
 
72
 
    def _stub_admin_client(self, auth_ok=True):
73
 
        kc_v3.Client(
74
 
            auth_url='http://server.test:5000/v3',
75
 
            cacert=None,
76
 
            cert=None,
77
 
            endpoint='http://server.test:5000/v3',
78
 
            insecure=False,
79
 
            key=None,
80
 
            password='verybadpass',
81
 
            project_name='service',
82
 
            username='heat').AndReturn(self.mock_admin_client)
83
 
        self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
 
72
    def _stub_admin_auth(self, auth_ok=True):
 
73
        mock_ks_auth = self.m.CreateMockAnything()
 
74
 
 
75
        a = mock_ks_auth.get_user_id(mox.IsA(ks_session.Session))
84
76
        if auth_ok:
85
 
            self.mock_admin_client.authenticate().AndReturn(auth_ok)
86
 
            self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
87
 
            self.mock_admin_client.auth_ref.user_id = '1234'
 
77
            a.AndReturn('1234')
88
78
        else:
89
 
            self.mock_admin_client.authenticate().AndRaise(
90
 
                kc_exception.Unauthorized)
91
 
 
92
 
    def _stub_domain_admin_client(self, auth_ok=True):
93
 
        kc_v3.Client(
94
 
            auth_url='http://server.test:5000/v3',
95
 
            cacert=None,
96
 
            cert=None,
97
 
            endpoint='http://server.test:5000/v3',
98
 
            insecure=False,
99
 
            key=None,
100
 
            password='adminsecret',
101
 
            user_domain_id='adomain123',
102
 
            username='adminuser123').AndReturn(self.mock_admin_client)
 
79
            a.AndRaise(kc_exception.Unauthorized)
 
80
 
 
81
        m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
 
82
                                password='verybadpass',
 
83
                                user_domain_id='default',
 
84
                                username='heat')
 
85
        m.AndReturn(mock_ks_auth)
 
86
 
 
87
    def _stub_domain_admin_client(self, domain_id=None):
 
88
        mock_ks_auth = self.m.CreateMockAnything()
 
89
        mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn('tok')
 
90
 
 
91
        m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
 
92
                                password='adminsecret',
 
93
                                domain_id='adomain123',
 
94
                                domain_name=None,
 
95
                                user_domain_id='adomain123',
 
96
                                user_domain_name=None,
 
97
                                username='adminuser123')
 
98
        m.AndReturn(mock_ks_auth)
 
99
 
 
100
        n = kc_v3.Client(session=mox.IsA(ks_session.Session),
 
101
                         auth=mock_ks_auth)
 
102
        n.AndReturn(self.mock_admin_client)
 
103
 
103
104
        self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
104
 
        self.mock_admin_client.authenticate(
105
 
            domain_id='adomain123').AndReturn(auth_ok)
106
 
        if auth_ok:
107
 
            self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
108
 
            self.mock_admin_client.auth_ref.user_id = '1234'
109
 
            self.mock_admin_client.auth_ref.domain_id = 'adomain123'
110
105
 
111
106
    def _stubs_v3(self, method='token', trust_scoped=True,
112
 
                  user_id='trustor_user_id', auth_ref=None, client=True,
113
 
                  times=1):
 
107
                  user_id=None, auth_ref=None, client=True, project_id=None,
 
108
                  stub_trust_context=False):
114
109
        mock_auth_ref = self.m.CreateMockAnything()
115
110
        mock_ks_auth = self.m.CreateMockAnything()
116
111
 
118
113
            p = ks_token_endpoint.Token(token='abcd1234',
119
114
                                        endpoint='http://server.test:5000/v3')
120
115
        elif method == 'auth_ref':
121
 
            p = context._AccessInfoPlugin('http://server.test:5000/v3',
122
 
                                          mox.IsA(ks_access.AccessInfo))
 
116
            p = ks_auth_access.AccessInfoPlugin(
 
117
                auth_url='http://server.test:5000/v3',
 
118
                auth_ref=mox.IsA(ks_access.AccessInfo))
123
119
 
124
120
        elif method == 'password':
125
121
            p = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
126
122
                                    username='test_username',
127
123
                                    password='password',
128
 
                                    project_id='test_tenant_id',
 
124
                                    project_id=project_id or 'test_tenant_id',
129
125
                                    user_domain_id='default')
130
126
 
131
127
        elif method == 'trust':
135
131
                                    user_domain_id='default',
136
132
                                    trust_id='atrust123')
137
133
 
138
 
            mock_auth_ref.user_id = user_id
 
134
            mock_auth_ref.user_id = user_id or 'trustor_user_id'
 
135
            mock_auth_ref.project_id = project_id or 'test_tenant_id'
139
136
            mock_auth_ref.trust_scoped = trust_scoped
140
137
            mock_auth_ref.auth_token = 'atrusttoken'
141
138
 
146
143
                             auth=mock_ks_auth)
147
144
            c.AndReturn(self.mock_ks_v3_client)
148
145
 
149
 
            for x in xrange(0, times):
150
 
                m = mock_ks_auth.get_access(mox.IsA(ks_session.Session))
151
 
                m.AndReturn(mock_auth_ref)
 
146
            if stub_trust_context:
 
147
                m = mock_ks_auth.get_user_id(mox.IsA(ks_session.Session))
 
148
                m.AndReturn(user_id)
 
149
 
 
150
                m = mock_ks_auth.get_project_id(mox.IsA(ks_session.Session))
 
151
                m.AndReturn(project_id)
 
152
 
 
153
            m = mock_ks_auth.get_access(mox.IsA(ks_session.Session))
 
154
            m.AndReturn(mock_auth_ref)
152
155
 
153
156
        return mock_ks_auth, mock_auth_ref
154
157
 
277
280
        ctx = utils.dummy_context()
278
281
        ctx.trust_id = None
279
282
 
280
 
        self._stub_domain_admin_client()
 
283
        self._stub_domain_admin_client(domain_id=None)
281
284
 
282
285
        # mock keystone client functions
283
286
        self.mock_admin_client.roles = self.m.CreateMockAnything()
513
516
        class MockTrust(object):
514
517
            id = 'atrust123'
515
518
 
516
 
        self._stub_admin_client()
517
 
        mock_client, mock_auth_ref = self._stubs_v3(times=2)
 
519
        self._stub_admin_auth()
 
520
        mock_ks_auth, mock_auth_ref = self._stubs_v3(user_id='5678',
 
521
                                                     project_id='42',
 
522
                                                     stub_trust_context=True)
518
523
 
519
524
        cfg.CONF.set_override('deferred_auth_method', 'trusts')
520
525
        if delegate_roles:
523
528
        trustor_roles = ['heat_stack_owner', 'admin', '__member__']
524
529
        trustee_roles = delegate_roles or trustor_roles
525
530
 
 
531
        self.mock_ks_v3_client.trusts = self.m.CreateMockAnything()
 
532
 
526
533
        mock_auth_ref.user_id = '5678'
527
534
        mock_auth_ref.project_id = '42'
528
535
 
547
554
 
548
555
        """Test create_trust_context when creating a trust."""
549
556
 
550
 
        self._stub_admin_client()
551
 
 
552
 
        mock_auth, mock_auth_ref = self._stubs_v3(times=2)
 
557
        self._stub_admin_auth()
 
558
 
 
559
        mock_auth, mock_auth_ref = self._stubs_v3(user_id='5678',
 
560
                                                  project_id='42',
 
561
                                                  stub_trust_context=True)
 
562
 
553
563
        cfg.CONF.set_override('deferred_auth_method', 'trusts')
554
564
        cfg.CONF.set_override('trusts_delegated_roles', ['heat_stack_owner'])
555
565
 
556
 
        mock_auth_ref.user_id = '5678'
557
 
        mock_auth_ref.project_id = '42'
558
 
 
559
566
        self.mock_ks_v3_client.trusts = self.m.CreateMockAnything()
560
567
        self.mock_ks_v3_client.trusts.create(
561
568
            trustor_user='5678',
602
609
                   '"stack_domain_admin_password"')
603
610
        self.assertIn(exp_msg, six.text_type(err))
604
611
 
605
 
    def test_init_admin_client(self):
606
 
 
607
 
        """Test the admin_client property."""
608
 
 
609
 
        self._stub_admin_client()
610
 
        self.m.ReplayAll()
611
 
 
612
 
        ctx = utils.dummy_context()
613
 
        ctx.username = None
614
 
        ctx.password = None
615
 
        ctx.trust_id = None
616
 
        heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
617
 
        self.assertEqual(self.mock_admin_client, heat_ks_client.admin_client)
618
 
        self.assertEqual(self.mock_admin_client, heat_ks_client._admin_client)
619
 
 
620
 
    def test_init_admin_client_denied(self):
621
 
 
622
 
        """Test the admin_client property, auth failure path."""
623
 
 
624
 
        self._stub_admin_client(auth_ok=False)
625
 
        self.m.ReplayAll()
626
 
 
627
 
        ctx = utils.dummy_context()
628
 
        ctx.username = None
629
 
        ctx.password = None
630
 
        ctx.trust_id = None
631
 
        heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
632
 
 
633
 
        # Define wrapper for property or the property raises the exception
634
 
        # outside of the assertRaises which fails the test
635
 
        def get_admin_client():
636
 
            heat_ks_client.admin_client
637
 
 
638
 
        self.assertRaises(exception.AuthorizationFailure,
639
 
                          get_admin_client)
640
 
 
641
612
    def test_trust_init(self):
642
613
 
643
614
        """Test consuming a trust when initializing."""
1042
1013
 
1043
1014
        """Test creating ec2 credentials for domain user."""
1044
1015
 
1045
 
        self._stub_domain_admin_client()
 
1016
        self._stub_domain_admin_client(domain_id=None)
1046
1017
 
1047
1018
        ctx = utils.dummy_context()
1048
1019
        ctx.trust_id = None
1302
1273
 
1303
1274
        """Test the delete_stack_domain_project function."""
1304
1275
 
1305
 
        self._stub_domain_admin_client()
 
1276
        self._stub_domain_admin_client(domain_id=None)
1306
1277
        self.mock_admin_client.projects = self.m.CreateMockAnything()
1307
1278
        self.mock_admin_client.projects.get(project='aprojectid').AndRaise(
1308
1279
            kc_exception.NotFound)
1317
1288
 
1318
1289
        """Test the delete_stack_domain_project function."""
1319
1290
 
1320
 
        self._stub_domain_admin_client()
 
1291
        self._stub_domain_admin_client(domain_id=None)
1321
1292
        self.mock_admin_client.projects = self.m.CreateMockAnything()
1322
1293
        self.mock_admin_client.projects.get(project='aprojectid').AndRaise(
1323
1294
            kc_exception.Forbidden)
1366
1337
 
1367
1338
    def test_stack_domain_user_token(self):
1368
1339
        """Test stack_domain_user_token function."""
1369
 
        dummysession = self.m.CreateMockAnything()
1370
 
        dummyresp = self.m.CreateMockAnything()
1371
 
        dummyresp.headers = {'X-Subject-Token': 'dummytoken'}
1372
 
        dummysession.post('http://server.test:5000/v3/auth/tokens?nocatalog',
1373
 
                          authenticated=False,
1374
 
                          headers={'Accept': 'application/json'},
1375
 
                          json=mox.IgnoreArg()).AndReturn(dummyresp)
1376
 
        self.m.StubOutWithMock(ks_session, 'Session')
1377
 
        ks_session.Session.construct(mox.IsA(dict)).AndReturn(dummysession)
 
1340
        dum_tok = 'dummytoken'
 
1341
        mock_ks_auth = self.m.CreateMockAnything()
 
1342
        mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn(dum_tok)
 
1343
 
 
1344
        m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
 
1345
                                password='apassw',
 
1346
                                project_id='aproject',
 
1347
                                user_id='duser',
 
1348
                                include_catalog=False)
 
1349
        m.AndReturn(mock_ks_auth)
 
1350
 
1378
1351
        self.m.ReplayAll()
1379
1352
 
1380
1353
        ctx = utils.dummy_context()
1381
1354
        ctx.trust_id = None
1382
1355
        heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
1383
 
        token = heat_ks_client.stack_domain_user_token(
1384
 
            user_id='duser', project_id='aproject', password='apassw')
1385
 
        self.assertEqual('dummytoken', token)
 
1356
        token = heat_ks_client.stack_domain_user_token(user_id='duser',
 
1357
                                                       project_id='aproject',
 
1358
                                                       password='apassw')
 
1359
        self.assertEqual(dum_tok, token)
1386
1360
 
1387
1361
    def test_stack_domain_user_token_err_nodomain(self):
1388
1362
        """Test stack_domain_user_token error path."""
1489
1463
    def _clear_domain_override(self):
1490
1464
        cfg.CONF.clear_override('stack_user_domain_name')
1491
1465
 
1492
 
    def _stub_domain_admin_client(self, auth_ok=True):
1493
 
        kc_v3.Client(
1494
 
            auth_url='http://server.test:5000/v3',
1495
 
            cacert=None,
1496
 
            cert=None,
1497
 
            endpoint='http://server.test:5000/v3',
1498
 
            insecure=False,
1499
 
            key=None,
1500
 
            password='adminsecret',
1501
 
            user_domain_name='fake_domain_name',
1502
 
            username='adminuser123').AndReturn(self.mock_admin_client)
 
1466
    def _stub_domain_admin_client_domain_get(self):
 
1467
        dummy_domain = self.m.CreateMockAnything()
 
1468
        dummy_domain.id = 'adomain123'
 
1469
        self.mock_ks_v3_client_domain_mngr.list(
 
1470
            name='fake_domain_name').AndReturn([dummy_domain])
 
1471
 
 
1472
    def _stub_domain_admin_client(self, domain_id='adomain123'):
 
1473
        mock_ks_auth = self.m.CreateMockAnything()
 
1474
        mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn('tok')
 
1475
 
 
1476
        if domain_id:
 
1477
            a = self.m.CreateMockAnything()
 
1478
            a.domain_id = domain_id
 
1479
            mock_ks_auth.get_access(mox.IsA(ks_session.Session)).AndReturn(a)
 
1480
 
 
1481
        m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
 
1482
                                password='adminsecret',
 
1483
                                domain_id=None,
 
1484
                                domain_name='fake_domain_name',
 
1485
                                user_domain_id=None,
 
1486
                                user_domain_name='fake_domain_name',
 
1487
                                username='adminuser123')
 
1488
 
 
1489
        m.AndReturn(mock_ks_auth)
 
1490
 
 
1491
        n = kc_v3.Client(session=mox.IsA(ks_session.Session),
 
1492
                         auth=mock_ks_auth)
 
1493
        n.AndReturn(self.mock_admin_client)
 
1494
 
1503
1495
        self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
1504
 
        self.mock_admin_client.authenticate(
1505
 
            domain_name='fake_domain_name').AndReturn(auth_ok)
1506
 
        if auth_ok:
1507
 
            self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
1508
 
            self.mock_admin_client.auth_ref.user_id = '1234'
1509
 
            self.mock_admin_client.auth_ref.domain_id = 'adomain123'
1510
1496
 
1511
1497
    def _stub_domain_user_pw_auth(self):
1512
1498
        ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
1583
1569
    def test_create_stack_domain_user(self):
1584
1570
        p = super(KeystoneClientTestDomainName, self)
1585
1571
        p.test_create_stack_domain_user()
 
1572
 
 
1573
 
 
1574
class HeatClientTest(KeystoneClientTest):
 
1575
    """Test cases for heat.common.config"""
 
1576
 
 
1577
    def setUp(self):
 
1578
        super(HeatClientTest, self).setUp()
 
1579
 
 
1580
    def test_init_auth_encryption_key_length(self):
 
1581
        """Test for length of the auth_encryption_length in config file"""
 
1582
        cfg.CONF.set_override('auth_encryption_key', 'abcdefghijklma')
 
1583
        err = self.assertRaises(exception.Error,
 
1584
                                config.startup_sanity_check)
 
1585
        exp_msg = ('heat.conf misconfigured, auth_encryption_key '
 
1586
                   'length must be 16, 24 or 32')
 
1587
        self.assertIn(exp_msg, six.text_type(err))