~ubuntu-branches/ubuntu/wily/keystone/wily

« back to all changes in this revision

Viewing changes to keystone/tests/test_auth.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short, Chuck Short, James Page, Adam Gandelman
  • Date: 2013-09-09 18:02:41 UTC
  • mfrom: (1.1.36)
  • Revision ID: package-import@ubuntu.com-20130909180241-5pizm6rcauhg4x93
Tags: 1:2013.2~b3-0ubuntu1
[ Chuck Short ]
* New upstream release. 
* debian/control: Add python-oslo.sphinx as a build dependency.
* debian/control: Add python-babel as a build dependency.
* debian/control: Add python-dogpile.cache as a build dependency.
* debian/control: Add python-oauth2 as a build dependency. 
* debian/patches/sql_connection.patch: Refreshed

[ James Page ]
* d/patches/fix-ubuntu-tests.patch: Fixup for new tests location.
* d/patches/ubuntu-test-overrides.patch: Override testing defaults
  using patches.
* d/rules: Rework for patching approach for test_overrides.conf.
* d/tests/test_overrides.conf: Dropped - no longer required.
* d/control: Add python-netaddr to BD's.

[ Adam Gandelman ]
* debian/control: Add python-testtools to Build-Depends.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2012 OpenStack LLC
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
4
# not use this file except in compliance with the License. You may obtain
 
5
# a copy of the License at
 
6
#
 
7
#      http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
11
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
12
# License for the specific language governing permissions and limitations
 
13
# under the License.
 
14
 
 
15
import copy
 
16
import datetime
 
17
import uuid
 
18
 
 
19
from keystone.tests import core as test
 
20
 
 
21
from keystone import auth
 
22
from keystone import config
 
23
from keystone import exception
 
24
from keystone.openstack.common import timeutils
 
25
from keystone import token
 
26
from keystone import trust
 
27
 
 
28
import default_fixtures
 
29
 
 
30
 
 
31
CONF = config.CONF
 
32
TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
 
33
 
 
34
 
 
35
def _build_user_auth(token=None, user_id=None, username=None,
 
36
                     password=None, tenant_id=None, tenant_name=None,
 
37
                     trust_id=None):
 
38
    """Build auth dictionary.
 
39
 
 
40
    It will create an auth dictionary based on all the arguments
 
41
    that it receives.
 
42
    """
 
43
    auth_json = {}
 
44
    if token is not None:
 
45
        auth_json['token'] = token
 
46
    if username or password:
 
47
        auth_json['passwordCredentials'] = {}
 
48
    if username is not None:
 
49
        auth_json['passwordCredentials']['username'] = username
 
50
    if user_id is not None:
 
51
        auth_json['passwordCredentials']['userId'] = user_id
 
52
    if password is not None:
 
53
        auth_json['passwordCredentials']['password'] = password
 
54
    if tenant_name is not None:
 
55
        auth_json['tenantName'] = tenant_name
 
56
    if tenant_id is not None:
 
57
        auth_json['tenantId'] = tenant_id
 
58
    if trust_id is not None:
 
59
        auth_json['trust_id'] = trust_id
 
60
    return auth_json
 
61
 
 
62
 
 
63
class AuthTest(test.TestCase):
 
64
    def setUp(self):
 
65
        super(AuthTest, self).setUp()
 
66
 
 
67
        CONF.identity.driver = 'keystone.identity.backends.kvs.Identity'
 
68
        self.load_backends()
 
69
        self.load_fixtures(default_fixtures)
 
70
 
 
71
        # need to register the token provider first because auth controller
 
72
        # depends on it
 
73
        token.provider.Manager()
 
74
 
 
75
        self.controller = token.controllers.Auth()
 
76
 
 
77
    def assertEqualTokens(self, a, b):
 
78
        """Assert that two tokens are equal.
 
79
 
 
80
        Compare two tokens except for their ids. This also truncates
 
81
        the time in the comparison.
 
82
        """
 
83
        def normalize(token):
 
84
            token['access']['token']['id'] = 'dummy'
 
85
            del token['access']['token']['expires']
 
86
            del token['access']['token']['issued_at']
 
87
            return token
 
88
 
 
89
        self.assertCloseEnoughForGovernmentWork(
 
90
            timeutils.parse_isotime(a['access']['token']['expires']),
 
91
            timeutils.parse_isotime(b['access']['token']['expires']))
 
92
        self.assertCloseEnoughForGovernmentWork(
 
93
            timeutils.parse_isotime(a['access']['token']['issued_at']),
 
94
            timeutils.parse_isotime(b['access']['token']['issued_at']))
 
95
        return self.assertDictEqual(normalize(a), normalize(b))
 
96
 
 
97
 
 
98
class AuthBadRequests(AuthTest):
 
99
    def setUp(self):
 
100
        super(AuthBadRequests, self).setUp()
 
101
 
 
102
    def test_no_external_auth(self):
 
103
        """Verify that _authenticate_external() raises exception if N/A."""
 
104
        self.assertRaises(
 
105
            token.controllers.ExternalAuthNotApplicable,
 
106
            self.controller._authenticate_external,
 
107
            {}, {})
 
108
 
 
109
    def test_no_token_in_auth(self):
 
110
        """Verify that _authenticate_token() raises exception if no token."""
 
111
        self.assertRaises(
 
112
            exception.ValidationError,
 
113
            self.controller._authenticate_token,
 
114
            None, {})
 
115
 
 
116
    def test_no_credentials_in_auth(self):
 
117
        """Verify that _authenticate_local() raises exception if no creds."""
 
118
        self.assertRaises(
 
119
            exception.ValidationError,
 
120
            self.controller._authenticate_local,
 
121
            None, {})
 
122
 
 
123
    def test_authenticate_blank_request_body(self):
 
124
        """Verify sending empty json dict raises the right exception."""
 
125
        self.assertRaises(exception.ValidationError,
 
126
                          self.controller.authenticate,
 
127
                          {}, {})
 
128
 
 
129
    def test_authenticate_blank_auth(self):
 
130
        """Verify sending blank 'auth' raises the right exception."""
 
131
        body_dict = _build_user_auth()
 
132
        self.assertRaises(exception.ValidationError,
 
133
                          self.controller.authenticate,
 
134
                          {}, body_dict)
 
135
 
 
136
    def test_authenticate_invalid_auth_content(self):
 
137
        """Verify sending invalid 'auth' raises the right exception."""
 
138
        self.assertRaises(exception.ValidationError,
 
139
                          self.controller.authenticate,
 
140
                          {}, {'auth': 'abcd'})
 
141
 
 
142
    def test_authenticate_user_id_too_large(self):
 
143
        """Verify sending large 'userId' raises the right exception."""
 
144
        body_dict = _build_user_auth(user_id='0' * 65, username='FOO',
 
145
                                     password='foo2')
 
146
        self.assertRaises(exception.ValidationSizeError,
 
147
                          self.controller.authenticate,
 
148
                          {}, body_dict)
 
149
 
 
150
    def test_authenticate_username_too_large(self):
 
151
        """Verify sending large 'username' raises the right exception."""
 
152
        body_dict = _build_user_auth(username='0' * 65, password='foo2')
 
153
        self.assertRaises(exception.ValidationSizeError,
 
154
                          self.controller.authenticate,
 
155
                          {}, body_dict)
 
156
 
 
157
    def test_authenticate_tenant_id_too_large(self):
 
158
        """Verify sending large 'tenantId' raises the right exception."""
 
159
        body_dict = _build_user_auth(username='FOO', password='foo2',
 
160
                                     tenant_id='0' * 65)
 
161
        self.assertRaises(exception.ValidationSizeError,
 
162
                          self.controller.authenticate,
 
163
                          {}, body_dict)
 
164
 
 
165
    def test_authenticate_tenant_name_too_large(self):
 
166
        """Verify sending large 'tenantName' raises the right exception."""
 
167
        body_dict = _build_user_auth(username='FOO', password='foo2',
 
168
                                     tenant_name='0' * 65)
 
169
        self.assertRaises(exception.ValidationSizeError,
 
170
                          self.controller.authenticate,
 
171
                          {}, body_dict)
 
172
 
 
173
    def test_authenticate_token_too_large(self):
 
174
        """Verify sending large 'token' raises the right exception."""
 
175
        body_dict = _build_user_auth(token={'id': '0' * 8193})
 
176
        self.assertRaises(exception.ValidationSizeError,
 
177
                          self.controller.authenticate,
 
178
                          {}, body_dict)
 
179
 
 
180
    def test_authenticate_password_too_large(self):
 
181
        """Verify sending large 'password' raises the right exception."""
 
182
        length = CONF.identity.max_password_length + 1
 
183
        body_dict = _build_user_auth(username='FOO', password='0' * length)
 
184
        self.assertRaises(exception.ValidationSizeError,
 
185
                          self.controller.authenticate,
 
186
                          {}, body_dict)
 
187
 
 
188
 
 
189
class AuthWithToken(AuthTest):
 
190
    def setUp(self):
 
191
        super(AuthWithToken, self).setUp()
 
192
 
 
193
    def test_unscoped_token(self):
 
194
        """Verify getting an unscoped token with password creds."""
 
195
        body_dict = _build_user_auth(username='FOO',
 
196
                                     password='foo2')
 
197
        unscoped_token = self.controller.authenticate({}, body_dict)
 
198
        tenant = unscoped_token["access"]["token"].get("tenant", None)
 
199
        self.assertEqual(tenant, None)
 
200
 
 
201
    def test_auth_invalid_token(self):
 
202
        """Verify exception is raised if invalid token."""
 
203
        body_dict = _build_user_auth(token={"id": uuid.uuid4().hex})
 
204
        self.assertRaises(
 
205
            exception.Unauthorized,
 
206
            self.controller.authenticate,
 
207
            {}, body_dict)
 
208
 
 
209
    def test_auth_bad_formatted_token(self):
 
210
        """Verify exception is raised if invalid token."""
 
211
        body_dict = _build_user_auth(token={})
 
212
        self.assertRaises(
 
213
            exception.ValidationError,
 
214
            self.controller.authenticate,
 
215
            {}, body_dict)
 
216
 
 
217
    def test_auth_unscoped_token_no_project(self):
 
218
        """Verify getting an unscoped token with an unscoped token."""
 
219
        body_dict = _build_user_auth(
 
220
            username='FOO',
 
221
            password='foo2')
 
222
        unscoped_token = self.controller.authenticate({}, body_dict)
 
223
 
 
224
        body_dict = _build_user_auth(
 
225
            token=unscoped_token["access"]["token"])
 
226
        unscoped_token_2 = self.controller.authenticate({}, body_dict)
 
227
 
 
228
        self.assertEqualTokens(unscoped_token, unscoped_token_2)
 
229
 
 
230
    def test_auth_unscoped_token_project(self):
 
231
        """Verify getting a token in a tenant with an unscoped token."""
 
232
        # Add a role in so we can check we get this back
 
233
        self.identity_api.add_role_to_user_and_project(
 
234
            self.user_foo['id'],
 
235
            self.tenant_bar['id'],
 
236
            self.role_member['id'])
 
237
        # Get an unscoped tenant
 
238
        body_dict = _build_user_auth(
 
239
            username='FOO',
 
240
            password='foo2')
 
241
        unscoped_token = self.controller.authenticate({}, body_dict)
 
242
        # Get a token on BAR tenant using the unscoped tenant
 
243
        body_dict = _build_user_auth(
 
244
            token=unscoped_token["access"]["token"],
 
245
            tenant_name="BAR")
 
246
        scoped_token = self.controller.authenticate({}, body_dict)
 
247
 
 
248
        tenant = scoped_token["access"]["token"]["tenant"]
 
249
        roles = scoped_token["access"]["metadata"]["roles"]
 
250
        self.assertEquals(tenant["id"], self.tenant_bar['id'])
 
251
        self.assertEquals(roles[0], self.role_member['id'])
 
252
 
 
253
    def test_auth_token_project_group_role(self):
 
254
        """Verify getting a token in a tenant with group roles."""
 
255
        # Add a v2 style role in so we can check we get this back
 
256
        self.identity_api.add_role_to_user_and_project(
 
257
            self.user_foo['id'],
 
258
            self.tenant_bar['id'],
 
259
            self.role_member['id'])
 
260
        # Now create a group role for this user as well
 
261
        new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
 
262
                     'name': uuid.uuid4().hex}
 
263
        self.identity_api.create_group(new_group['id'], new_group)
 
264
        self.identity_api.add_user_to_group(self.user_foo['id'],
 
265
                                            new_group['id'])
 
266
        self.identity_api.create_grant(
 
267
            group_id=new_group['id'],
 
268
            project_id=self.tenant_bar['id'],
 
269
            role_id=self.role_admin['id'])
 
270
 
 
271
        # Get a scoped token for the tenant
 
272
        body_dict = _build_user_auth(
 
273
            username='FOO',
 
274
            password='foo2',
 
275
            tenant_name="BAR")
 
276
 
 
277
        scoped_token = self.controller.authenticate({}, body_dict)
 
278
 
 
279
        tenant = scoped_token["access"]["token"]["tenant"]
 
280
        roles = scoped_token["access"]["metadata"]["roles"]
 
281
        self.assertEquals(tenant["id"], self.tenant_bar['id'])
 
282
        self.assertIn(self.role_member['id'], roles)
 
283
        self.assertIn(self.role_admin['id'], roles)
 
284
 
 
285
    def test_auth_token_cross_domain_group_and_project(self):
 
286
        """Verify getting a token in cross domain group/project roles."""
 
287
        # create domain, project and group and grant roles to user
 
288
        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
 
289
        self.identity_api.create_domain(domain1['id'], domain1)
 
290
        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
 
291
                    'domain_id': domain1['id']}
 
292
        self.assignment_api.create_project(project1['id'], project1)
 
293
        role_foo_domain1 = {'id': uuid.uuid4().hex,
 
294
                            'name': uuid.uuid4().hex}
 
295
        self.identity_api.create_role(role_foo_domain1['id'],
 
296
                                      role_foo_domain1)
 
297
        role_group_domain1 = {'id': uuid.uuid4().hex,
 
298
                              'name': uuid.uuid4().hex}
 
299
        self.identity_api.create_role(role_group_domain1['id'],
 
300
                                      role_group_domain1)
 
301
        self.identity_api.add_user_to_project(project1['id'],
 
302
                                              self.user_foo['id'])
 
303
        new_group = {'id': uuid.uuid4().hex, 'domain_id': domain1['id'],
 
304
                     'name': uuid.uuid4().hex}
 
305
        self.identity_api.create_group(new_group['id'], new_group)
 
306
        self.identity_api.add_user_to_group(self.user_foo['id'],
 
307
                                            new_group['id'])
 
308
        self.identity_api.create_grant(
 
309
            user_id=self.user_foo['id'],
 
310
            project_id=project1['id'],
 
311
            role_id=self.role_member['id'])
 
312
        self.identity_api.create_grant(
 
313
            group_id=new_group['id'],
 
314
            project_id=project1['id'],
 
315
            role_id=self.role_admin['id'])
 
316
        self.identity_api.create_grant(
 
317
            user_id=self.user_foo['id'],
 
318
            domain_id=domain1['id'],
 
319
            role_id=role_foo_domain1['id'])
 
320
        self.identity_api.create_grant(
 
321
            group_id=new_group['id'],
 
322
            domain_id=domain1['id'],
 
323
            role_id=role_group_domain1['id'])
 
324
 
 
325
        # Get a scoped token for the tenant
 
326
        body_dict = _build_user_auth(
 
327
            username=self.user_foo['name'],
 
328
            password=self.user_foo['password'],
 
329
            tenant_name=project1['name'])
 
330
 
 
331
        scoped_token = self.controller.authenticate({}, body_dict)
 
332
        tenant = scoped_token["access"]["token"]["tenant"]
 
333
        roles = scoped_token["access"]["metadata"]["roles"]
 
334
        self.assertEquals(tenant["id"], project1['id'])
 
335
        self.assertIn(self.role_member['id'], roles)
 
336
        self.assertIn(self.role_admin['id'], roles)
 
337
        self.assertNotIn(role_foo_domain1['id'], roles)
 
338
        self.assertNotIn(role_group_domain1['id'], roles)
 
339
 
 
340
    def test_belongs_to_no_tenant(self):
 
341
        r = self.controller.authenticate(
 
342
            {},
 
343
            auth={
 
344
                'passwordCredentials': {
 
345
                    'username': self.user_foo['name'],
 
346
                    'password': self.user_foo['password']
 
347
                }
 
348
            })
 
349
        unscoped_token_id = r['access']['token']['id']
 
350
        self.assertRaises(
 
351
            exception.Unauthorized,
 
352
            self.controller.validate_token,
 
353
            dict(is_admin=True, query_string={'belongsTo': 'BAR'}),
 
354
            token_id=unscoped_token_id)
 
355
 
 
356
    def test_belongs_to(self):
 
357
        body_dict = _build_user_auth(
 
358
            username='FOO',
 
359
            password='foo2',
 
360
            tenant_name="BAR")
 
361
 
 
362
        scoped_token = self.controller.authenticate({}, body_dict)
 
363
        scoped_token_id = scoped_token['access']['token']['id']
 
364
 
 
365
        self.assertRaises(
 
366
            exception.Unauthorized,
 
367
            self.controller.validate_token,
 
368
            dict(is_admin=True, query_string={'belongsTo': 'me'}),
 
369
            token_id=scoped_token_id)
 
370
 
 
371
        self.assertRaises(
 
372
            exception.Unauthorized,
 
373
            self.controller.validate_token,
 
374
            dict(is_admin=True, query_string={'belongsTo': 'BAR'}),
 
375
            token_id=scoped_token_id)
 
376
 
 
377
    def test_token_auth_with_binding(self):
 
378
        CONF.token.bind = ['kerberos']
 
379
        body_dict = _build_user_auth()
 
380
        context = {'REMOTE_USER': 'FOO', 'AUTH_TYPE': 'Negotiate'}
 
381
        unscoped_token = self.controller.authenticate(context, body_dict)
 
382
 
 
383
        # the token should have bind information in it
 
384
        bind = unscoped_token['access']['token']['bind']
 
385
        self.assertEqual(bind['kerberos'], 'FOO')
 
386
 
 
387
        body_dict = _build_user_auth(
 
388
            token=unscoped_token['access']['token'],
 
389
            tenant_name='BAR')
 
390
 
 
391
        # using unscoped token without remote user context fails
 
392
        self.assertRaises(
 
393
            exception.Unauthorized,
 
394
            self.controller.authenticate,
 
395
            {}, body_dict)
 
396
 
 
397
        # using token with remote user context succeeds
 
398
        scoped_token = self.controller.authenticate(context, body_dict)
 
399
 
 
400
        # the bind information should be carried over from the original token
 
401
        bind = scoped_token['access']['token']['bind']
 
402
        self.assertEqual(bind['kerberos'], 'FOO')
 
403
 
 
404
 
 
405
class AuthWithPasswordCredentials(AuthTest):
 
406
    def setUp(self):
 
407
        super(AuthWithPasswordCredentials, self).setUp()
 
408
 
 
409
    def test_auth_invalid_user(self):
 
410
        """Verify exception is raised if invalid user."""
 
411
        body_dict = _build_user_auth(
 
412
            username=uuid.uuid4().hex,
 
413
            password=uuid.uuid4().hex)
 
414
        self.assertRaises(
 
415
            exception.Unauthorized,
 
416
            self.controller.authenticate,
 
417
            {}, body_dict)
 
418
 
 
419
    def test_auth_valid_user_invalid_password(self):
 
420
        """Verify exception is raised if invalid password."""
 
421
        body_dict = _build_user_auth(
 
422
            username="FOO",
 
423
            password=uuid.uuid4().hex)
 
424
        self.assertRaises(
 
425
            exception.Unauthorized,
 
426
            self.controller.authenticate,
 
427
            {}, body_dict)
 
428
 
 
429
    def test_auth_empty_password(self):
 
430
        """Verify exception is raised if empty password."""
 
431
        body_dict = _build_user_auth(
 
432
            username="FOO",
 
433
            password="")
 
434
        self.assertRaises(
 
435
            exception.Unauthorized,
 
436
            self.controller.authenticate,
 
437
            {}, body_dict)
 
438
 
 
439
    def test_auth_no_password(self):
 
440
        """Verify exception is raised if empty password."""
 
441
        body_dict = _build_user_auth(username="FOO")
 
442
        self.assertRaises(
 
443
            exception.ValidationError,
 
444
            self.controller.authenticate,
 
445
            {}, body_dict)
 
446
 
 
447
    def test_authenticate_blank_password_credentials(self):
 
448
        """Sending empty dict as passwordCredentials raises a 400 error."""
 
449
        body_dict = {'passwordCredentials': {}, 'tenantName': 'demo'}
 
450
        self.assertRaises(exception.ValidationError,
 
451
                          self.controller.authenticate,
 
452
                          {}, body_dict)
 
453
 
 
454
    def test_authenticate_no_username(self):
 
455
        """Verify skipping username raises the right exception."""
 
456
        body_dict = _build_user_auth(password="pass",
 
457
                                     tenant_name="demo")
 
458
        self.assertRaises(exception.ValidationError,
 
459
                          self.controller.authenticate,
 
460
                          {}, body_dict)
 
461
 
 
462
    def test_bind_without_remote_user(self):
 
463
        CONF.token.bind = ['kerberos']
 
464
        body_dict = _build_user_auth(username='FOO', password='foo2',
 
465
                                     tenant_name='BAR')
 
466
        token = self.controller.authenticate({}, body_dict)
 
467
        self.assertNotIn('bind', token['access']['token'])
 
468
 
 
469
 
 
470
class AuthWithRemoteUser(AuthTest):
 
471
    def setUp(self):
 
472
        super(AuthWithRemoteUser, self).setUp()
 
473
 
 
474
    def test_unscoped_remote_authn(self):
 
475
        """Verify getting an unscoped token with external authn."""
 
476
        body_dict = _build_user_auth(
 
477
            username='FOO',
 
478
            password='foo2')
 
479
        local_token = self.controller.authenticate(
 
480
            {}, body_dict)
 
481
 
 
482
        body_dict = _build_user_auth()
 
483
        remote_token = self.controller.authenticate(
 
484
            {'REMOTE_USER': 'FOO'}, body_dict)
 
485
 
 
486
        self.assertEqualTokens(local_token, remote_token)
 
487
 
 
488
    def test_unscoped_remote_authn_jsonless(self):
 
489
        """Verify that external auth with invalid request fails."""
 
490
        self.assertRaises(
 
491
            exception.ValidationError,
 
492
            self.controller.authenticate,
 
493
            {'REMOTE_USER': 'FOO'},
 
494
            None)
 
495
 
 
496
    def test_scoped_remote_authn(self):
 
497
        """Verify getting a token with external authn."""
 
498
        body_dict = _build_user_auth(
 
499
            username='FOO',
 
500
            password='foo2',
 
501
            tenant_name='BAR')
 
502
        local_token = self.controller.authenticate(
 
503
            {}, body_dict)
 
504
 
 
505
        body_dict = _build_user_auth(
 
506
            tenant_name='BAR')
 
507
        remote_token = self.controller.authenticate(
 
508
            {'REMOTE_USER': 'FOO'}, body_dict)
 
509
 
 
510
        self.assertEqualTokens(local_token, remote_token)
 
511
 
 
512
    def test_scoped_nometa_remote_authn(self):
 
513
        """Verify getting a token with external authn and no metadata."""
 
514
        body_dict = _build_user_auth(
 
515
            username='TWO',
 
516
            password='two2',
 
517
            tenant_name='BAZ')
 
518
        local_token = self.controller.authenticate(
 
519
            {}, body_dict)
 
520
 
 
521
        body_dict = _build_user_auth(tenant_name='BAZ')
 
522
        remote_token = self.controller.authenticate(
 
523
            {'REMOTE_USER': 'TWO'}, body_dict)
 
524
 
 
525
        self.assertEqualTokens(local_token, remote_token)
 
526
 
 
527
    def test_scoped_remote_authn_invalid_user(self):
 
528
        """Verify that external auth with invalid user fails."""
 
529
        body_dict = _build_user_auth(tenant_name="BAR")
 
530
        self.assertRaises(
 
531
            exception.Unauthorized,
 
532
            self.controller.authenticate,
 
533
            {'REMOTE_USER': uuid.uuid4().hex},
 
534
            body_dict)
 
535
 
 
536
    def test_bind_with_kerberos(self):
 
537
        CONF.token.bind = ['kerberos']
 
538
        kerb = {'REMOTE_USER': 'FOO', 'AUTH_TYPE': 'Negotiate'}
 
539
        body_dict = _build_user_auth(tenant_name="BAR")
 
540
        token = self.controller.authenticate(kerb, body_dict)
 
541
        self.assertEqual(token['access']['token']['bind']['kerberos'], 'FOO')
 
542
 
 
543
    def test_bind_without_config_opt(self):
 
544
        CONF.token.bind = ['x509']
 
545
        kerb = {'REMOTE_USER': 'FOO', 'AUTH_TYPE': 'Negotiate'}
 
546
        body_dict = _build_user_auth(tenant_name='BAR')
 
547
        token = self.controller.authenticate(kerb, body_dict)
 
548
        self.assertNotIn('bind', token['access']['token'])
 
549
 
 
550
 
 
551
class AuthWithTrust(AuthTest):
 
552
    def setUp(self):
 
553
        super(AuthWithTrust, self).setUp()
 
554
        self.opt_in_group('trust', enabled=True)
 
555
 
 
556
        trust.Manager()
 
557
        self.trust_controller = trust.controllers.TrustV3()
 
558
        self.auth_v3_controller = auth.controllers.Auth()
 
559
        self.trustor = self.user_foo
 
560
        self.trustee = self.user_two
 
561
        self.assigned_roles = [self.role_member['id'],
 
562
                               self.role_browser['id']]
 
563
        for assigned_role in self.assigned_roles:
 
564
            self.identity_api.add_role_to_user_and_project(
 
565
                self.trustor['id'], self.tenant_bar['id'], assigned_role)
 
566
 
 
567
        self.sample_data = {'trustor_user_id': self.trustor['id'],
 
568
                            'trustee_user_id': self.trustee['id'],
 
569
                            'project_id': self.tenant_bar['id'],
 
570
                            'impersonation': 'True',
 
571
                            'roles': [{'id': self.role_browser['id']},
 
572
                                      {'name': self.role_member['name']}]}
 
573
        expires_at = timeutils.strtime(timeutils.utcnow() +
 
574
                                       datetime.timedelta(minutes=10),
 
575
                                       fmt=TIME_FORMAT)
 
576
        self.create_trust(expires_at=expires_at)
 
577
 
 
578
    def create_trust(self, expires_at=None, impersonation='True'):
 
579
        username = self.trustor['name'],
 
580
        password = 'foo2'
 
581
        body_dict = _build_user_auth(username=username, password=password)
 
582
        self.unscoped_token = self.controller.authenticate({}, body_dict)
 
583
        context = {'token_id': self.unscoped_token['access']['token']['id']}
 
584
        trust_data = copy.deepcopy(self.sample_data)
 
585
        trust_data['expires_at'] = expires_at
 
586
        trust_data['impersonation'] = impersonation
 
587
 
 
588
        self.new_trust = self.trust_controller.create_trust(
 
589
            context, trust=trust_data)['trust']
 
590
 
 
591
    def build_v2_token_request(self, username, password):
 
592
        body_dict = _build_user_auth(username=username, password=password)
 
593
        self.unscoped_token = self.controller.authenticate({}, body_dict)
 
594
        unscoped_token_id = self.unscoped_token['access']['token']['id']
 
595
        request_body = _build_user_auth(token={'id': unscoped_token_id},
 
596
                                        trust_id=self.new_trust['id'],
 
597
                                        tenant_id=self.tenant_bar['id'])
 
598
        return request_body
 
599
 
 
600
    def test_create_trust_bad_data_fails(self):
 
601
        context = {'token_id': self.unscoped_token['access']['token']['id']}
 
602
        bad_sample_data = {'trustor_user_id': self.trustor['id']}
 
603
 
 
604
        self.assertRaises(exception.ValidationError,
 
605
                          self.trust_controller.create_trust,
 
606
                          context, trust=bad_sample_data)
 
607
 
 
608
    def test_create_trust_no_roles(self):
 
609
        self.new_trust = None
 
610
        self.sample_data['roles'] = []
 
611
        self.create_trust()
 
612
        self.assertEquals(self.new_trust['roles'], [])
 
613
 
 
614
    def test_create_trust(self):
 
615
        self.assertEquals(self.new_trust['trustor_user_id'],
 
616
                          self.trustor['id'])
 
617
        self.assertEquals(self.new_trust['trustee_user_id'],
 
618
                          self.trustee['id'])
 
619
        role_ids = [self.role_browser['id'], self.role_member['id']]
 
620
        self.assertTrue(timeutils.parse_strtime(self.new_trust['expires_at'],
 
621
                                                fmt=TIME_FORMAT))
 
622
        self.assertIn('http://localhost:5000/v3/OS-TRUST/',
 
623
                      self.new_trust['links']['self'])
 
624
        self.assertIn('http://localhost:5000/v3/OS-TRUST/',
 
625
                      self.new_trust['roles_links']['self'])
 
626
 
 
627
        for role in self.new_trust['roles']:
 
628
            self.assertIn(role['id'], role_ids)
 
629
 
 
630
    def test_get_trust(self):
 
631
        context = {'token_id': self.unscoped_token['access']['token']['id']}
 
632
        trust = self.trust_controller.get_trust(context,
 
633
                                                self.new_trust['id'])['trust']
 
634
        self.assertEquals(trust['trustor_user_id'],
 
635
                          self.trustor['id'])
 
636
        self.assertEquals(trust['trustee_user_id'],
 
637
                          self.trustee['id'])
 
638
        role_ids = [self.role_browser['id'], self.role_member['id']]
 
639
        for role in self.new_trust['roles']:
 
640
            self.assertIn(role['id'], role_ids)
 
641
 
 
642
    def test_create_trust_no_impersonation(self):
 
643
        self.create_trust(expires_at=None, impersonation='False')
 
644
        self.assertEquals(self.new_trust['trustor_user_id'],
 
645
                          self.trustor['id'])
 
646
        self.assertEquals(self.new_trust['trustee_user_id'],
 
647
                          self.trustee['id'])
 
648
        self.assertEquals(self.new_trust['impersonation'],
 
649
                          'False')
 
650
        auth_response = self.fetch_v2_token_from_trust()
 
651
        token_user = auth_response['access']['user']
 
652
        self.assertEquals(token_user['id'],
 
653
                          self.new_trust['trustee_user_id'])
 
654
 
 
655
        # TODO(ayoung): Endpoints
 
656
 
 
657
    def test_token_from_trust_wrong_user_fails(self):
 
658
        request_body = self.build_v2_token_request('FOO', 'foo2')
 
659
        self.assertRaises(
 
660
            exception.Forbidden,
 
661
            self.controller.authenticate, {}, request_body)
 
662
 
 
663
    def fetch_v2_token_from_trust(self):
 
664
        request_body = self.build_v2_token_request('TWO', 'two2')
 
665
        auth_response = self.controller.authenticate({}, request_body)
 
666
        return auth_response
 
667
 
 
668
    def fetch_v3_token_from_trust(self):
 
669
        v3_password_data = {
 
670
            'identity': {
 
671
                "methods": ["password"],
 
672
                "password": {
 
673
                    "user": {
 
674
                        "id": self.trustee["id"],
 
675
                        "password": self.trustee["password"]}}
 
676
            },
 
677
            'scope': {
 
678
                'project': {
 
679
                    'id': self.tenant_baz['id']}}}
 
680
        auth_response = (self.auth_v3_controller.authenticate_for_token
 
681
                         ({'query_string': {}}, v3_password_data))
 
682
        token = auth_response.headers['X-Subject-Token']
 
683
 
 
684
        v3_req_with_trust = {
 
685
            "identity": {
 
686
                "methods": ["token"],
 
687
                "token": {"id": token}},
 
688
            "scope": {
 
689
                "OS-TRUST:trust": {"id": self.new_trust['id']}}}
 
690
        token_auth_response = (self.auth_v3_controller.authenticate_for_token
 
691
                               ({'query_string': {}}, v3_req_with_trust))
 
692
        return token_auth_response
 
693
 
 
694
    def test_create_v3_token_from_trust(self):
 
695
        auth_response = self.fetch_v3_token_from_trust()
 
696
 
 
697
        trust_token_user = auth_response.json['token']['user']
 
698
        self.assertEquals(trust_token_user['id'], self.trustor['id'])
 
699
 
 
700
        trust_token_trust = auth_response.json['token']['OS-TRUST:trust']
 
701
        self.assertEquals(trust_token_trust['id'], self.new_trust['id'])
 
702
        self.assertEquals(trust_token_trust['trustor_user']['id'],
 
703
                          self.trustor['id'])
 
704
        self.assertEquals(trust_token_trust['trustee_user']['id'],
 
705
                          self.trustee['id'])
 
706
 
 
707
        trust_token_roles = auth_response.json['token']['roles']
 
708
        self.assertEquals(len(trust_token_roles), 2)
 
709
 
 
710
    def test_v3_trust_token_get_token_fails(self):
 
711
        auth_response = self.fetch_v3_token_from_trust()
 
712
        trust_token = auth_response.headers['X-Subject-Token']
 
713
        v3_token_data = {'identity': {
 
714
            'methods': ['token'],
 
715
            'token': {'id': trust_token}
 
716
        }}
 
717
        self.assertRaises(
 
718
            exception.Forbidden,
 
719
            self.auth_v3_controller.authenticate_for_token,
 
720
            {'query_string': {}}, v3_token_data)
 
721
 
 
722
    def test_token_from_trust(self):
 
723
        auth_response = self.fetch_v2_token_from_trust()
 
724
 
 
725
        self.assertIsNotNone(auth_response)
 
726
        self.assertEquals(len(auth_response['access']['metadata']['roles']),
 
727
                          2,
 
728
                          "user_foo has three roles, but the token should"
 
729
                          " only get the two roles specified in the trust.")
 
730
 
 
731
    def assert_token_count_for_trust(self, expected_value):
 
732
        tokens = self.trust_controller.token_api.list_tokens(
 
733
            self.trustee['id'], trust_id=self.new_trust['id'])
 
734
        token_count = len(tokens)
 
735
        self.assertEquals(token_count, expected_value)
 
736
 
 
737
    def test_delete_tokens_for_user_invalidates_tokens_from_trust(self):
 
738
        self.assert_token_count_for_trust(0)
 
739
        self.fetch_v2_token_from_trust()
 
740
        self.assert_token_count_for_trust(1)
 
741
        self.trust_controller._delete_tokens_for_user(self.trustee['id'])
 
742
        self.assert_token_count_for_trust(0)
 
743
 
 
744
    def test_token_from_trust_cant_get_another_token(self):
 
745
        auth_response = self.fetch_v2_token_from_trust()
 
746
        trust_token_id = auth_response['access']['token']['id']
 
747
        request_body = _build_user_auth(token={'id': trust_token_id},
 
748
                                        tenant_id=self.tenant_bar['id'])
 
749
        self.assertRaises(
 
750
            exception.Forbidden,
 
751
            self.controller.authenticate, {}, request_body)
 
752
 
 
753
    def test_delete_trust_revokes_token(self):
 
754
        context = {'token_id': self.unscoped_token['access']['token']['id']}
 
755
        self.fetch_v2_token_from_trust()
 
756
        trust_id = self.new_trust['id']
 
757
        tokens = self.token_api.list_tokens(self.trustor['id'],
 
758
                                            trust_id=trust_id)
 
759
        self.assertEquals(len(tokens), 1)
 
760
        self.trust_controller.delete_trust(context, trust_id=trust_id)
 
761
        tokens = self.token_api.list_tokens(self.trustor['id'],
 
762
                                            trust_id=trust_id)
 
763
        self.assertEquals(len(tokens), 0)
 
764
 
 
765
    def test_token_from_trust_with_no_role_fails(self):
 
766
        for assigned_role in self.assigned_roles:
 
767
            self.identity_api.remove_role_from_user_and_project(
 
768
                self.trustor['id'], self.tenant_bar['id'], assigned_role)
 
769
        request_body = self.build_v2_token_request('TWO', 'two2')
 
770
        self.assertRaises(
 
771
            exception.Forbidden,
 
772
            self.controller.authenticate, {}, request_body)
 
773
 
 
774
    def test_expired_trust_get_token_fails(self):
 
775
        expiry = "1999-02-18T10:10:00Z"
 
776
        self.create_trust(expiry)
 
777
        request_body = self.build_v2_token_request('TWO', 'two2')
 
778
        self.assertRaises(
 
779
            exception.Forbidden,
 
780
            self.controller.authenticate, {}, request_body)
 
781
 
 
782
    def test_token_from_trust_with_wrong_role_fails(self):
 
783
        self.identity_api.add_role_to_user_and_project(
 
784
            self.trustor['id'],
 
785
            self.tenant_bar['id'],
 
786
            self.role_other['id'])
 
787
        for assigned_role in self.assigned_roles:
 
788
            self.identity_api.remove_role_from_user_and_project(
 
789
                self.trustor['id'], self.tenant_bar['id'], assigned_role)
 
790
 
 
791
        request_body = self.build_v2_token_request('TWO', 'two2')
 
792
 
 
793
        self.assertRaises(
 
794
            exception.Forbidden,
 
795
            self.controller.authenticate, {}, request_body)
 
796
 
 
797
 
 
798
class TokenExpirationTest(AuthTest):
 
799
    def _maintain_token_expiration(self):
 
800
        """Token expiration should be maintained after re-auth & validation."""
 
801
        timeutils.set_time_override()
 
802
 
 
803
        r = self.controller.authenticate(
 
804
            {},
 
805
            auth={
 
806
                'passwordCredentials': {
 
807
                    'username': self.user_foo['name'],
 
808
                    'password': self.user_foo['password']
 
809
                }
 
810
            })
 
811
        unscoped_token_id = r['access']['token']['id']
 
812
        original_expiration = r['access']['token']['expires']
 
813
 
 
814
        timeutils.advance_time_seconds(1)
 
815
 
 
816
        r = self.controller.validate_token(
 
817
            dict(is_admin=True, query_string={}),
 
818
            token_id=unscoped_token_id)
 
819
        self.assertEqual(original_expiration, r['access']['token']['expires'])
 
820
 
 
821
        timeutils.advance_time_seconds(1)
 
822
 
 
823
        r = self.controller.authenticate(
 
824
            {},
 
825
            auth={
 
826
                'token': {
 
827
                    'id': unscoped_token_id,
 
828
                },
 
829
                'tenantId': self.tenant_bar['id'],
 
830
            })
 
831
        scoped_token_id = r['access']['token']['id']
 
832
        self.assertEqual(original_expiration, r['access']['token']['expires'])
 
833
 
 
834
        timeutils.advance_time_seconds(1)
 
835
 
 
836
        r = self.controller.validate_token(
 
837
            dict(is_admin=True, query_string={}),
 
838
            token_id=scoped_token_id)
 
839
        self.assertEqual(original_expiration, r['access']['token']['expires'])
 
840
 
 
841
    def test_maintain_uuid_token_expiration(self):
 
842
        self.opt_in_group('signing', token_format='UUID')
 
843
        self._maintain_token_expiration()
 
844
 
 
845
 
 
846
class NonDefaultAuthTest(test.TestCase):
 
847
 
 
848
    def test_add_non_default_auth_method(self):
 
849
        self.opt_in_group('auth', methods=['password', 'token', 'custom'])
 
850
        config.setup_authentication()
 
851
        self.assertTrue(hasattr(CONF.auth, 'custom'))