~ubuntu-branches/debian/jessie/keystone/jessie

« back to all changes in this revision

Viewing changes to .pc/CVE-2012-4457-Raise_unauthorized_if_tenant_disabled.patch/tests/test_keystoneclient.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2012-10-01 05:52:23 UTC
  • Revision ID: package-import@ubuntu.com-20121001055223-7fldz5pv6lc80w9f
Tags: 2012.1.1-9
* Fixes sometimes failing keystone.postrm (db_get in some conditions can
return false), and fixed non-consistant indenting.
* Uses /usr/share/keystone/keystone.conf instead of /usr/share/doc/keystone
/keystone.conf.sample for temporary storing the conf file (this was a policy
violation, as the doc folder should never be required).
* Fixes CVE-2012-4457: fails to raise Unauthorized user error for disabled,
CVE-2012-4456: fails to validate tokens in Admin API (Closes: #689210).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 
2
 
 
3
# Copyright 2012 OpenStack LLC
 
4
#
 
5
# Licensed under the Apache License, Version 2.0 (the "License"); you may
 
6
# not use this file except in compliance with the License. You may obtain
 
7
# a copy of the License at
 
8
#
 
9
#      http://www.apache.org/licenses/LICENSE-2.0
 
10
#
 
11
# Unless required by applicable law or agreed to in writing, software
 
12
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 
13
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 
14
# License for the specific language governing permissions and limitations
 
15
# under the License.
 
16
 
 
17
import time
 
18
import uuid
 
19
 
 
20
import nose.exc
 
21
 
 
22
from keystone import test
 
23
 
 
24
import default_fixtures
 
25
 
 
26
OPENSTACK_REPO = 'https://review.openstack.org/p/openstack'
 
27
KEYSTONECLIENT_REPO = '%s/python-keystoneclient.git' % OPENSTACK_REPO
 
28
 
 
29
 
 
30
class CompatTestCase(test.TestCase):
 
31
    def setUp(self):
 
32
        super(CompatTestCase, self).setUp()
 
33
 
 
34
        revdir = test.checkout_vendor(*self.get_checkout())
 
35
        self.add_path(revdir)
 
36
        self.clear_module('keystoneclient')
 
37
 
 
38
        self.load_backends()
 
39
        self.load_fixtures(default_fixtures)
 
40
 
 
41
        self.public_server = self.serveapp('keystone', name='main')
 
42
        self.admin_server = self.serveapp('keystone', name='admin')
 
43
 
 
44
        # TODO(termie): is_admin is being deprecated once the policy stuff
 
45
        #               is all working
 
46
        # TODO(termie): add an admin user to the fixtures and use that user
 
47
        # override the fixtures, for now
 
48
        self.metadata_foobar = self.identity_api.update_metadata(
 
49
            self.user_foo['id'], self.tenant_bar['id'],
 
50
            dict(roles=['keystone_admin'], is_admin='1'))
 
51
 
 
52
    def tearDown(self):
 
53
        self.public_server.kill()
 
54
        self.admin_server.kill()
 
55
        self.public_server = None
 
56
        self.admin_server = None
 
57
        super(CompatTestCase, self).tearDown()
 
58
 
 
59
    def _public_url(self):
 
60
        public_port = self.public_server.socket_info['socket'][1]
 
61
        return "http://localhost:%s/v2.0" % public_port
 
62
 
 
63
    def _admin_url(self):
 
64
        admin_port = self.admin_server.socket_info['socket'][1]
 
65
        return "http://localhost:%s/v2.0" % admin_port
 
66
 
 
67
    def _client(self, admin=False, **kwargs):
 
68
        from keystoneclient.v2_0 import client as ks_client
 
69
 
 
70
        url = self._admin_url() if admin else self._public_url()
 
71
        kc = ks_client.Client(endpoint=url,
 
72
                              auth_url=self._public_url(),
 
73
                              **kwargs)
 
74
        kc.authenticate()
 
75
        # have to manually overwrite the management url after authentication
 
76
        kc.management_url = url
 
77
        return kc
 
78
 
 
79
    def get_client(self, user_ref=None, tenant_ref=None, admin=False):
 
80
        if user_ref is None:
 
81
            user_ref = self.user_foo
 
82
        if tenant_ref is None:
 
83
            for user in default_fixtures.USERS:
 
84
                if user['id'] == user_ref['id']:
 
85
                    tenant_id = user['tenants'][0]
 
86
        else:
 
87
            tenant_id = tenant_ref['id']
 
88
 
 
89
        return self._client(username=user_ref['name'],
 
90
                            password=user_ref['password'],
 
91
                            tenant_id=tenant_id,
 
92
                            admin=admin)
 
93
 
 
94
 
 
95
class KeystoneClientTests(object):
 
96
    """Tests for all versions of keystoneclient."""
 
97
 
 
98
    def test_authenticate_tenant_name_and_tenants(self):
 
99
        client = self.get_client()
 
100
        tenants = client.tenants.list()
 
101
        self.assertEquals(tenants[0].id, self.tenant_bar['id'])
 
102
 
 
103
    def test_authenticate_tenant_id_and_tenants(self):
 
104
        client = self._client(username=self.user_foo['name'],
 
105
                              password=self.user_foo['password'],
 
106
                              tenant_id='bar')
 
107
        tenants = client.tenants.list()
 
108
        self.assertEquals(tenants[0].id, self.tenant_bar['id'])
 
109
 
 
110
    def test_authenticate_invalid_tenant_id(self):
 
111
        from keystoneclient import exceptions as client_exceptions
 
112
        self.assertRaises(client_exceptions.Unauthorized,
 
113
                          self._client,
 
114
                          username=self.user_foo['name'],
 
115
                          password=self.user_foo['password'],
 
116
                          tenant_id='baz')
 
117
 
 
118
    def test_authenticate_token_no_tenant(self):
 
119
        client = self.get_client()
 
120
        token = client.auth_token
 
121
        token_client = self._client(token=token)
 
122
        tenants = token_client.tenants.list()
 
123
        self.assertEquals(tenants[0].id, self.tenant_bar['id'])
 
124
 
 
125
    def test_authenticate_token_tenant_id(self):
 
126
        client = self.get_client()
 
127
        token = client.auth_token
 
128
        token_client = self._client(token=token, tenant_id='bar')
 
129
        tenants = token_client.tenants.list()
 
130
        self.assertEquals(tenants[0].id, self.tenant_bar['id'])
 
131
 
 
132
    def test_authenticate_token_invalid_tenant_id(self):
 
133
        from keystoneclient import exceptions as client_exceptions
 
134
        client = self.get_client()
 
135
        token = client.auth_token
 
136
        self.assertRaises(client_exceptions.AuthorizationFailure,
 
137
                          self._client, token=token, tenant_id='baz')
 
138
 
 
139
    def test_authenticate_token_tenant_name(self):
 
140
        client = self.get_client()
 
141
        token = client.auth_token
 
142
        token_client = self._client(token=token, tenant_name='BAR')
 
143
        tenants = token_client.tenants.list()
 
144
        self.assertEquals(tenants[0].id, self.tenant_bar['id'])
 
145
        self.assertEquals(tenants[0].id, self.tenant_bar['id'])
 
146
 
 
147
    def test_authenticate_and_delete_token(self):
 
148
        from keystoneclient import exceptions as client_exceptions
 
149
 
 
150
        client = self.get_client(admin=True)
 
151
        token = client.auth_token
 
152
        token_client = self._client(token=token)
 
153
        tenants = token_client.tenants.list()
 
154
        self.assertEquals(tenants[0].id, self.tenant_bar['id'])
 
155
 
 
156
        client.tokens.delete(token_client.auth_token)
 
157
 
 
158
        self.assertRaises(client_exceptions.Unauthorized,
 
159
                          token_client.tenants.list)
 
160
 
 
161
    def test_authenticate_no_password(self):
 
162
        from keystoneclient import exceptions as client_exceptions
 
163
 
 
164
        user_ref = self.user_foo.copy()
 
165
        user_ref['password'] = None
 
166
        self.assertRaises(client_exceptions.AuthorizationFailure,
 
167
                          self.get_client,
 
168
                          user_ref)
 
169
 
 
170
    def test_authenticate_no_username(self):
 
171
        from keystoneclient import exceptions as client_exceptions
 
172
 
 
173
        user_ref = self.user_foo.copy()
 
174
        user_ref['name'] = None
 
175
        self.assertRaises(client_exceptions.AuthorizationFailure,
 
176
                          self.get_client,
 
177
                          user_ref)
 
178
 
 
179
    # FIXME(ja): this test should require the "keystone:admin" roled
 
180
    #            (probably the role set via --keystone_admin_role flag)
 
181
    # FIXME(ja): add a test that admin endpoint is only sent to admin user
 
182
    # FIXME(ja): add a test that admin endpoint returns unauthorized if not
 
183
    #            admin
 
184
    def test_tenant_create_update_and_delete(self):
 
185
        from keystoneclient import exceptions as client_exceptions
 
186
 
 
187
        tenant_name = 'original_tenant'
 
188
        tenant_description = 'My original tenant!'
 
189
        tenant_enabled = True
 
190
        client = self.get_client(admin=True)
 
191
 
 
192
        # create, get, and list a tenant
 
193
        tenant = client.tenants.create(tenant_name=tenant_name,
 
194
                                       description=tenant_description,
 
195
                                       enabled=tenant_enabled)
 
196
        self.assertEquals(tenant.name, tenant_name)
 
197
        self.assertEquals(tenant.description, tenant_description)
 
198
        self.assertEquals(tenant.enabled, tenant_enabled)
 
199
 
 
200
        tenant = client.tenants.get(tenant_id=tenant.id)
 
201
        self.assertEquals(tenant.name, tenant_name)
 
202
        self.assertEquals(tenant.description, tenant_description)
 
203
        self.assertEquals(tenant.enabled, tenant_enabled)
 
204
 
 
205
        tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
 
206
        self.assertEquals(tenant.name, tenant_name)
 
207
        self.assertEquals(tenant.description, tenant_description)
 
208
        self.assertEquals(tenant.enabled, tenant_enabled)
 
209
 
 
210
        # update, get, and list a tenant
 
211
        tenant_name = 'updated_tenant'
 
212
        tenant_description = 'Updated tenant!'
 
213
        tenant_enabled = False
 
214
        tenant = client.tenants.update(tenant_id=tenant.id,
 
215
                                       tenant_name=tenant_name,
 
216
                                       enabled=tenant_enabled,
 
217
                                       description=tenant_description)
 
218
        self.assertEquals(tenant.name, tenant_name)
 
219
        self.assertEquals(tenant.description, tenant_description)
 
220
        self.assertEquals(tenant.enabled, tenant_enabled)
 
221
 
 
222
        tenant = client.tenants.get(tenant_id=tenant.id)
 
223
        self.assertEquals(tenant.name, tenant_name)
 
224
        self.assertEquals(tenant.description, tenant_description)
 
225
        self.assertEquals(tenant.enabled, tenant_enabled)
 
226
 
 
227
        tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
 
228
        self.assertEquals(tenant.name, tenant_name)
 
229
        self.assertEquals(tenant.description, tenant_description)
 
230
        self.assertEquals(tenant.enabled, tenant_enabled)
 
231
 
 
232
        # delete, get, and list a tenant
 
233
        client.tenants.delete(tenant=tenant.id)
 
234
        self.assertRaises(client_exceptions.NotFound, client.tenants.get,
 
235
                          tenant.id)
 
236
        self.assertFalse([t for t in client.tenants.list()
 
237
                           if t.id == tenant.id])
 
238
 
 
239
    def test_tenant_delete_404(self):
 
240
        from keystoneclient import exceptions as client_exceptions
 
241
        client = self.get_client(admin=True)
 
242
        self.assertRaises(client_exceptions.NotFound,
 
243
                          client.tenants.delete,
 
244
                          tenant=uuid.uuid4().hex)
 
245
 
 
246
    def test_tenant_get_404(self):
 
247
        from keystoneclient import exceptions as client_exceptions
 
248
        client = self.get_client(admin=True)
 
249
        self.assertRaises(client_exceptions.NotFound,
 
250
                          client.tenants.get,
 
251
                          tenant_id=uuid.uuid4().hex)
 
252
 
 
253
    def test_tenant_update_404(self):
 
254
        from keystoneclient import exceptions as client_exceptions
 
255
        client = self.get_client(admin=True)
 
256
        self.assertRaises(client_exceptions.NotFound,
 
257
                          client.tenants.update,
 
258
                          tenant_id=uuid.uuid4().hex)
 
259
 
 
260
    def test_tenant_list(self):
 
261
        client = self.get_client()
 
262
        tenants = client.tenants.list()
 
263
        self.assertEquals(len(tenants), 1)
 
264
 
 
265
        # Admin endpoint should return *all* tenants
 
266
        client = self.get_client(admin=True)
 
267
        tenants = client.tenants.list()
 
268
        self.assertEquals(len(tenants), len(default_fixtures.TENANTS))
 
269
 
 
270
    def test_invalid_password(self):
 
271
        from keystoneclient import exceptions as client_exceptions
 
272
 
 
273
        good_client = self._client(username=self.user_foo['name'],
 
274
                                   password=self.user_foo['password'])
 
275
        good_client.tenants.list()
 
276
 
 
277
        self.assertRaises(client_exceptions.Unauthorized,
 
278
                          self._client,
 
279
                          username=self.user_foo['name'],
 
280
                          password='invalid')
 
281
 
 
282
    def test_invalid_user_password(self):
 
283
        from keystoneclient import exceptions as client_exceptions
 
284
 
 
285
        self.assertRaises(client_exceptions.Unauthorized,
 
286
                          self._client,
 
287
                          username='blah',
 
288
                          password='blah')
 
289
 
 
290
    def test_change_password_invalidates_token(self):
 
291
        from keystoneclient import exceptions as client_exceptions
 
292
 
 
293
        client = self.get_client(admin=True)
 
294
 
 
295
        username = uuid.uuid4().hex
 
296
        passwd = uuid.uuid4().hex
 
297
        user = client.users.create(name=username, password=passwd,
 
298
                                   email=uuid.uuid4().hex)
 
299
 
 
300
        token_id = client.tokens.authenticate(username=username,
 
301
                                              password=passwd).id
 
302
 
 
303
        # authenticate with a token should work before a password change
 
304
        client.tokens.authenticate(token=token_id)
 
305
 
 
306
        client.users.update_password(user=user.id, password=uuid.uuid4().hex)
 
307
 
 
308
        # authenticate with a token should not work after a password change
 
309
        self.assertRaises(client_exceptions.Unauthorized,
 
310
                          client.tokens.authenticate,
 
311
                          token=token_id)
 
312
 
 
313
    def test_disable_user_invalidates_token(self):
 
314
        from keystoneclient import exceptions as client_exceptions
 
315
 
 
316
        admin_client = self.get_client(admin=True)
 
317
        foo_client = self.get_client(self.user_foo)
 
318
 
 
319
        admin_client.users.update_enabled(user=self.user_foo['id'],
 
320
                                          enabled=False)
 
321
 
 
322
        self.assertRaises(client_exceptions.Unauthorized,
 
323
                          foo_client.tokens.authenticate,
 
324
                          token=foo_client.auth_token)
 
325
 
 
326
        self.assertRaises(client_exceptions.Unauthorized,
 
327
                          self.get_client,
 
328
                          self.user_foo)
 
329
 
 
330
    def test_token_expiry_maintained(self):
 
331
        foo_client = self.get_client(self.user_foo)
 
332
        orig_token = foo_client.service_catalog.catalog['token']
 
333
 
 
334
        time.sleep(1.01)
 
335
        reauthenticated_token = foo_client.tokens.authenticate(
 
336
                                    token=foo_client.auth_token)
 
337
 
 
338
        self.assertEquals(orig_token['expires'],
 
339
                          reauthenticated_token.expires)
 
340
 
 
341
    def test_user_create_update_delete(self):
 
342
        from keystoneclient import exceptions as client_exceptions
 
343
 
 
344
        test_username = 'new_user'
 
345
        client = self.get_client(admin=True)
 
346
        user = client.users.create(name=test_username,
 
347
                                   password='password',
 
348
                                   email='user1@test.com')
 
349
        self.assertEquals(user.name, test_username)
 
350
 
 
351
        user = client.users.get(user=user.id)
 
352
        self.assertEquals(user.name, test_username)
 
353
 
 
354
        user = client.users.update(user=user,
 
355
                                   name=test_username,
 
356
                                   email='user2@test.com')
 
357
        self.assertEquals(user.email, 'user2@test.com')
 
358
 
 
359
        # NOTE(termie): update_enabled doesn't return anything, probably a bug
 
360
        client.users.update_enabled(user=user, enabled=False)
 
361
        user = client.users.get(user.id)
 
362
        self.assertFalse(user.enabled)
 
363
 
 
364
        self.assertRaises(client_exceptions.Unauthorized,
 
365
                  self._client,
 
366
                  username=test_username,
 
367
                  password='password')
 
368
        client.users.update_enabled(user, True)
 
369
 
 
370
        user = client.users.update_password(user=user, password='password2')
 
371
 
 
372
        self._client(username=test_username,
 
373
                     password='password2')
 
374
 
 
375
        user = client.users.update_tenant(user=user, tenant='bar')
 
376
        # TODO(ja): once keystonelight supports default tenant
 
377
        #           when you login without specifying tenant, the
 
378
        #           token should be scoped to tenant 'bar'
 
379
 
 
380
        client.users.delete(user.id)
 
381
        self.assertRaises(client_exceptions.NotFound, client.users.get,
 
382
                          user.id)
 
383
 
 
384
        # Test creating a user with a tenant (auto-add to tenant)
 
385
        user2 = client.users.create(name=test_username,
 
386
                                    password='password',
 
387
                                    email='user1@test.com',
 
388
                                    tenant_id='bar')
 
389
        self.assertEquals(user2.name, test_username)
 
390
 
 
391
    def test_user_create_404(self):
 
392
        from keystoneclient import exceptions as client_exceptions
 
393
        client = self.get_client(admin=True)
 
394
        self.assertRaises(client_exceptions.NotFound,
 
395
                          client.users.create,
 
396
                          name=uuid.uuid4().hex,
 
397
                          password=uuid.uuid4().hex,
 
398
                          email=uuid.uuid4().hex,
 
399
                          tenant_id=uuid.uuid4().hex)
 
400
 
 
401
    def test_user_get_404(self):
 
402
        from keystoneclient import exceptions as client_exceptions
 
403
        client = self.get_client(admin=True)
 
404
        self.assertRaises(client_exceptions.NotFound,
 
405
                          client.users.get,
 
406
                          user=uuid.uuid4().hex)
 
407
 
 
408
    def test_user_list_404(self):
 
409
        from keystoneclient import exceptions as client_exceptions
 
410
        client = self.get_client(admin=True)
 
411
        self.assertRaises(client_exceptions.NotFound,
 
412
                          client.users.list,
 
413
                          tenant_id=uuid.uuid4().hex)
 
414
 
 
415
    def test_user_update_404(self):
 
416
        from keystoneclient import exceptions as client_exceptions
 
417
        client = self.get_client(admin=True)
 
418
        self.assertRaises(client_exceptions.NotFound,
 
419
                          client.users.update,
 
420
                          user=uuid.uuid4().hex)
 
421
 
 
422
    def test_user_update_tenant_404(self):
 
423
        raise nose.exc.SkipTest('N/A')
 
424
        from keystoneclient import exceptions as client_exceptions
 
425
        client = self.get_client(admin=True)
 
426
        self.assertRaises(client_exceptions.NotFound,
 
427
                          client.users.update,
 
428
                          user=self.user_foo['id'],
 
429
                          tenant_id=uuid.uuid4().hex)
 
430
 
 
431
    def test_user_update_password_404(self):
 
432
        from keystoneclient import exceptions as client_exceptions
 
433
        client = self.get_client(admin=True)
 
434
        self.assertRaises(client_exceptions.NotFound,
 
435
                          client.users.update_password,
 
436
                          user=uuid.uuid4().hex,
 
437
                          password=uuid.uuid4().hex)
 
438
 
 
439
    def test_user_delete_404(self):
 
440
        from keystoneclient import exceptions as client_exceptions
 
441
        client = self.get_client(admin=True)
 
442
        self.assertRaises(client_exceptions.NotFound,
 
443
                          client.users.delete,
 
444
                          user=uuid.uuid4().hex)
 
445
 
 
446
    def test_user_list(self):
 
447
        client = self.get_client(admin=True)
 
448
        users = client.users.list()
 
449
        self.assertTrue(len(users) > 0)
 
450
        user = users[0]
 
451
        self.assertRaises(AttributeError, lambda: user.password)
 
452
 
 
453
    def test_user_get(self):
 
454
        client = self.get_client(admin=True)
 
455
        user = client.users.get(user=self.user_foo['id'])
 
456
        self.assertRaises(AttributeError, lambda: user.password)
 
457
 
 
458
    def test_role_get(self):
 
459
        client = self.get_client(admin=True)
 
460
        role = client.roles.get(role='keystone_admin')
 
461
        self.assertEquals(role.id, 'keystone_admin')
 
462
 
 
463
    def test_role_crud(self):
 
464
        from keystoneclient import exceptions as client_exceptions
 
465
 
 
466
        test_role = 'new_role'
 
467
        client = self.get_client(admin=True)
 
468
        role = client.roles.create(name=test_role)
 
469
        self.assertEquals(role.name, test_role)
 
470
 
 
471
        role = client.roles.get(role=role.id)
 
472
        self.assertEquals(role.name, test_role)
 
473
 
 
474
        client.roles.delete(role=role.id)
 
475
 
 
476
        self.assertRaises(client_exceptions.NotFound,
 
477
                          client.roles.delete,
 
478
                          role=role.id)
 
479
        self.assertRaises(client_exceptions.NotFound,
 
480
                          client.roles.get,
 
481
                          role=role.id)
 
482
 
 
483
    def test_role_get_404(self):
 
484
        from keystoneclient import exceptions as client_exceptions
 
485
        client = self.get_client(admin=True)
 
486
        self.assertRaises(client_exceptions.NotFound,
 
487
                          client.roles.get,
 
488
                          role=uuid.uuid4().hex)
 
489
 
 
490
    def test_role_delete_404(self):
 
491
        from keystoneclient import exceptions as client_exceptions
 
492
        client = self.get_client(admin=True)
 
493
        self.assertRaises(client_exceptions.NotFound,
 
494
                          client.roles.delete,
 
495
                          role=uuid.uuid4().hex)
 
496
 
 
497
    def test_role_list_404(self):
 
498
        from keystoneclient import exceptions as client_exceptions
 
499
        client = self.get_client(admin=True)
 
500
        self.assertRaises(client_exceptions.NotFound,
 
501
                          client.roles.roles_for_user,
 
502
                          user=uuid.uuid4().hex,
 
503
                          tenant=uuid.uuid4().hex)
 
504
        self.assertRaises(client_exceptions.NotFound,
 
505
                          client.roles.roles_for_user,
 
506
                          user=self.user_foo['id'],
 
507
                          tenant=uuid.uuid4().hex)
 
508
        self.assertRaises(client_exceptions.NotFound,
 
509
                          client.roles.roles_for_user,
 
510
                          user=uuid.uuid4().hex,
 
511
                          tenant=self.tenant_bar['id'])
 
512
 
 
513
    def test_role_list(self):
 
514
        client = self.get_client(admin=True)
 
515
        roles = client.roles.list()
 
516
        # TODO(devcamcar): This assert should be more specific.
 
517
        self.assertTrue(len(roles) > 0)
 
518
 
 
519
    def test_ec2_credential_crud(self):
 
520
        client = self.get_client()
 
521
        creds = client.ec2.list(user_id=self.user_foo['id'])
 
522
        self.assertEquals(creds, [])
 
523
 
 
524
        cred = client.ec2.create(user_id=self.user_foo['id'],
 
525
                                 tenant_id=self.tenant_bar['id'])
 
526
        creds = client.ec2.list(user_id=self.user_foo['id'])
 
527
        self.assertEquals(creds, [cred])
 
528
 
 
529
        got = client.ec2.get(user_id=self.user_foo['id'], access=cred.access)
 
530
        self.assertEquals(cred, got)
 
531
 
 
532
        client.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
 
533
        creds = client.ec2.list(user_id=self.user_foo['id'])
 
534
        self.assertEquals(creds, [])
 
535
 
 
536
    def test_ec2_credentials_create_404(self):
 
537
        from keystoneclient import exceptions as client_exceptions
 
538
        client = self.get_client()
 
539
        self.assertRaises(client_exceptions.NotFound,
 
540
                          client.ec2.create,
 
541
                          user_id=uuid.uuid4().hex,
 
542
                          tenant_id=self.tenant_bar['id'])
 
543
        self.assertRaises(client_exceptions.NotFound,
 
544
                          client.ec2.create,
 
545
                          user_id=self.user_foo['id'],
 
546
                          tenant_id=uuid.uuid4().hex)
 
547
 
 
548
    def test_ec2_credentials_delete_404(self):
 
549
        from keystoneclient import exceptions as client_exceptions
 
550
        client = self.get_client()
 
551
        self.assertRaises(client_exceptions.NotFound,
 
552
                          client.ec2.delete,
 
553
                          user_id=uuid.uuid4().hex,
 
554
                          access=uuid.uuid4().hex)
 
555
 
 
556
    def test_ec2_credentials_get_404(self):
 
557
        from keystoneclient import exceptions as client_exceptions
 
558
        client = self.get_client()
 
559
        self.assertRaises(client_exceptions.NotFound,
 
560
                          client.ec2.get,
 
561
                          user_id=uuid.uuid4().hex,
 
562
                          access=uuid.uuid4().hex)
 
563
 
 
564
    def test_ec2_credentials_list_404(self):
 
565
        from keystoneclient import exceptions as client_exceptions
 
566
        client = self.get_client()
 
567
        self.assertRaises(client_exceptions.NotFound,
 
568
                          client.ec2.list,
 
569
                          user_id=uuid.uuid4().hex)
 
570
 
 
571
    def test_ec2_credentials_list_user_forbidden(self):
 
572
        from keystoneclient import exceptions as client_exceptions
 
573
 
 
574
        two = self.get_client(self.user_two)
 
575
        self.assertRaises(client_exceptions.Forbidden, two.ec2.list,
 
576
                          user_id=self.user_foo['id'])
 
577
 
 
578
    def test_ec2_credentials_get_user_forbidden(self):
 
579
        from keystoneclient import exceptions as client_exceptions
 
580
 
 
581
        foo = self.get_client()
 
582
        cred = foo.ec2.create(user_id=self.user_foo['id'],
 
583
                              tenant_id=self.tenant_bar['id'])
 
584
 
 
585
        two = self.get_client(self.user_two)
 
586
        self.assertRaises(client_exceptions.Forbidden, two.ec2.get,
 
587
                          user_id=self.user_foo['id'], access=cred.access)
 
588
 
 
589
        foo.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
 
590
 
 
591
    def test_ec2_credentials_delete_user_forbidden(self):
 
592
        from keystoneclient import exceptions as client_exceptions
 
593
 
 
594
        foo = self.get_client()
 
595
        cred = foo.ec2.create(user_id=self.user_foo['id'],
 
596
                              tenant_id=self.tenant_bar['id'])
 
597
 
 
598
        two = self.get_client(self.user_two)
 
599
        self.assertRaises(client_exceptions.Forbidden, two.ec2.delete,
 
600
                          user_id=self.user_foo['id'], access=cred.access)
 
601
 
 
602
        foo.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
 
603
 
 
604
    def test_service_create_and_delete(self):
 
605
        from keystoneclient import exceptions as client_exceptions
 
606
 
 
607
        test_service = 'new_service'
 
608
        client = self.get_client(admin=True)
 
609
        service = client.services.create(name=test_service,
 
610
                                         service_type='test',
 
611
                                         description='test')
 
612
        self.assertEquals(service.name, test_service)
 
613
 
 
614
        service = client.services.get(id=service.id)
 
615
        self.assertEquals(service.name, test_service)
 
616
 
 
617
        client.services.delete(id=service.id)
 
618
        self.assertRaises(client_exceptions.NotFound, client.services.get,
 
619
                          id=service.id)
 
620
 
 
621
    def test_service_list(self):
 
622
        client = self.get_client(admin=True)
 
623
        test_service = 'new_service'
 
624
        service = client.services.create(name=test_service,
 
625
                                         service_type='test',
 
626
                                         description='test')
 
627
        services = client.services.list()
 
628
        # TODO(devcamcar): This assert should be more specific.
 
629
        self.assertTrue(len(services) > 0)
 
630
 
 
631
    def test_service_delete_404(self):
 
632
        from keystoneclient import exceptions as client_exceptions
 
633
        client = self.get_client(admin=True)
 
634
        self.assertRaises(client_exceptions.NotFound,
 
635
                          client.services.delete,
 
636
                          id=uuid.uuid4().hex)
 
637
 
 
638
    def test_service_get_404(self):
 
639
        from keystoneclient import exceptions as client_exceptions
 
640
        client = self.get_client(admin=True)
 
641
        self.assertRaises(client_exceptions.NotFound,
 
642
                          client.services.get,
 
643
                          id=uuid.uuid4().hex)
 
644
 
 
645
    def test_endpoint_create_404(self):
 
646
        from keystoneclient import exceptions as client_exceptions
 
647
        client = self.get_client(admin=True)
 
648
        self.assertRaises(client_exceptions.NotFound,
 
649
                          client.endpoints.create,
 
650
                          region=uuid.uuid4().hex,
 
651
                          service_id=uuid.uuid4().hex,
 
652
                          publicurl=uuid.uuid4().hex,
 
653
                          adminurl=uuid.uuid4().hex,
 
654
                          internalurl=uuid.uuid4().hex)
 
655
 
 
656
    def test_endpoint_delete_404(self):
 
657
        # the catalog backend is expected to return Not Implemented
 
658
        from keystoneclient import exceptions as client_exceptions
 
659
        client = self.get_client(admin=True)
 
660
        self.assertRaises(client_exceptions.HTTPNotImplemented,
 
661
                          client.endpoints.delete,
 
662
                          id=uuid.uuid4().hex)
 
663
 
 
664
    def test_admin_requires_adminness(self):
 
665
        from keystoneclient import exceptions as client_exceptions
 
666
        # FIXME(ja): this should be Unauthorized
 
667
        exception = client_exceptions.ClientException
 
668
 
 
669
        two = self.get_client(self.user_two, admin=True)  # non-admin user
 
670
 
 
671
        # USER CRUD
 
672
        self.assertRaises(exception,
 
673
                          two.users.list)
 
674
        self.assertRaises(exception,
 
675
                          two.users.get,
 
676
                          user=self.user_two['id'])
 
677
        self.assertRaises(exception,
 
678
                          two.users.create,
 
679
                          name='oops',
 
680
                          password='password',
 
681
                          email='oops@test.com')
 
682
        self.assertRaises(exception,
 
683
                          two.users.delete,
 
684
                          user=self.user_foo['id'])
 
685
 
 
686
        # TENANT CRUD
 
687
        self.assertRaises(exception,
 
688
                          two.tenants.list)
 
689
        self.assertRaises(exception,
 
690
                          two.tenants.get,
 
691
                          tenant_id=self.tenant_bar['id'])
 
692
        self.assertRaises(exception,
 
693
                          two.tenants.create,
 
694
                          tenant_name='oops',
 
695
                          description="shouldn't work!",
 
696
                          enabled=True)
 
697
        self.assertRaises(exception,
 
698
                          two.tenants.delete,
 
699
                          tenant=self.tenant_baz['id'])
 
700
 
 
701
        # ROLE CRUD
 
702
        self.assertRaises(exception,
 
703
                          two.roles.get,
 
704
                          role='keystone_admin')
 
705
        self.assertRaises(exception,
 
706
                          two.roles.list)
 
707
        self.assertRaises(exception,
 
708
                          two.roles.create,
 
709
                          name='oops')
 
710
        self.assertRaises(exception,
 
711
                          two.roles.delete,
 
712
                          role='keystone_admin')
 
713
 
 
714
        # TODO(ja): MEMBERSHIP CRUD
 
715
        # TODO(ja): determine what else todo
 
716
 
 
717
 
 
718
class KcMasterTestCase(CompatTestCase, KeystoneClientTests):
 
719
    def get_checkout(self):
 
720
        return KEYSTONECLIENT_REPO, 'master'
 
721
 
 
722
    def test_tenant_add_and_remove_user(self):
 
723
        client = self.get_client(admin=True)
 
724
        client.roles.add_user_role(tenant=self.tenant_baz['id'],
 
725
                                   user=self.user_two['id'],
 
726
                                   role=self.role_useless['id'])
 
727
        user_refs = client.tenants.list_users(tenant=self.tenant_baz['id'])
 
728
        self.assert_(self.user_two['id'] in [x.id for x in user_refs])
 
729
        client.roles.remove_user_role(tenant=self.tenant_baz['id'],
 
730
                                      user=self.user_two['id'],
 
731
                                      role=self.role_useless['id'])
 
732
        user_refs = client.tenants.list_users(tenant=self.tenant_baz['id'])
 
733
        self.assert_(self.user_two['id'] not in [x.id for x in user_refs])
 
734
 
 
735
    def test_user_role_add_404(self):
 
736
        from keystoneclient import exceptions as client_exceptions
 
737
        client = self.get_client(admin=True)
 
738
        self.assertRaises(client_exceptions.NotFound,
 
739
                          client.roles.add_user_role,
 
740
                          tenant=uuid.uuid4().hex,
 
741
                          user=self.user_foo['id'],
 
742
                          role=self.role_useless['id'])
 
743
        self.assertRaises(client_exceptions.NotFound,
 
744
                          client.roles.add_user_role,
 
745
                          tenant=self.tenant_baz['id'],
 
746
                          user=uuid.uuid4().hex,
 
747
                          role=self.role_useless['id'])
 
748
        self.assertRaises(client_exceptions.NotFound,
 
749
                          client.roles.add_user_role,
 
750
                          tenant=self.tenant_baz['id'],
 
751
                          user=self.user_foo['id'],
 
752
                          role=uuid.uuid4().hex)
 
753
 
 
754
    def test_user_role_remove_404(self):
 
755
        from keystoneclient import exceptions as client_exceptions
 
756
        client = self.get_client(admin=True)
 
757
        self.assertRaises(client_exceptions.NotFound,
 
758
                          client.roles.remove_user_role,
 
759
                          tenant=uuid.uuid4().hex,
 
760
                          user=self.user_foo['id'],
 
761
                          role=self.role_useless['id'])
 
762
        self.assertRaises(client_exceptions.NotFound,
 
763
                          client.roles.remove_user_role,
 
764
                          tenant=self.tenant_baz['id'],
 
765
                          user=uuid.uuid4().hex,
 
766
                          role=self.role_useless['id'])
 
767
        self.assertRaises(client_exceptions.NotFound,
 
768
                          client.roles.remove_user_role,
 
769
                          tenant=self.tenant_baz['id'],
 
770
                          user=self.user_foo['id'],
 
771
                          role=uuid.uuid4().hex)
 
772
        self.assertRaises(client_exceptions.NotFound,
 
773
                          client.roles.remove_user_role,
 
774
                          tenant=self.tenant_baz['id'],
 
775
                          user=self.user_foo['id'],
 
776
                          role=self.role_useless['id'])
 
777
 
 
778
    def test_tenant_list_marker(self):
 
779
        client = self.get_client()
 
780
 
 
781
        # Add two arbitrary tenants to user for testing purposes
 
782
        for i in range(2):
 
783
            tenant_id = uuid.uuid4().hex
 
784
            tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id}
 
785
            self.identity_api.create_tenant(tenant_id, tenant)
 
786
            self.identity_api.add_user_to_tenant(tenant_id,
 
787
                                                 self.user_foo['id'])
 
788
 
 
789
        tenants = client.tenants.list()
 
790
        self.assertEqual(len(tenants), 3)
 
791
 
 
792
        tenants_marker = client.tenants.list(marker=tenants[0].id)
 
793
        self.assertEqual(len(tenants_marker), 2)
 
794
        self.assertEqual(tenants[1].name, tenants_marker[0].name)
 
795
        self.assertEqual(tenants[2].name, tenants_marker[1].name)
 
796
 
 
797
    def test_tenant_list_marker_not_found(self):
 
798
        from keystoneclient import exceptions as client_exceptions
 
799
 
 
800
        client = self.get_client()
 
801
        self.assertRaises(client_exceptions.BadRequest,
 
802
                          client.tenants.list, marker=uuid.uuid4().hex)
 
803
 
 
804
    def test_tenant_list_limit(self):
 
805
        client = self.get_client()
 
806
 
 
807
        # Add two arbitrary tenants to user for testing purposes
 
808
        for i in range(2):
 
809
            tenant_id = uuid.uuid4().hex
 
810
            tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id}
 
811
            self.identity_api.create_tenant(tenant_id, tenant)
 
812
            self.identity_api.add_user_to_tenant(tenant_id,
 
813
                                                 self.user_foo['id'])
 
814
 
 
815
        tenants = client.tenants.list()
 
816
        self.assertEqual(len(tenants), 3)
 
817
 
 
818
        tenants_limited = client.tenants.list(limit=2)
 
819
        self.assertEqual(len(tenants_limited), 2)
 
820
        self.assertEqual(tenants[0].name, tenants_limited[0].name)
 
821
        self.assertEqual(tenants[1].name, tenants_limited[1].name)
 
822
 
 
823
    def test_tenant_list_limit_bad_value(self):
 
824
        from keystoneclient import exceptions as client_exceptions
 
825
 
 
826
        client = self.get_client()
 
827
        self.assertRaises(client_exceptions.BadRequest,
 
828
                          client.tenants.list, limit='a')
 
829
        self.assertRaises(client_exceptions.BadRequest,
 
830
                          client.tenants.list, limit=-1)
 
831
 
 
832
    def test_roles_get_by_user(self):
 
833
        client = self.get_client(admin=True)
 
834
        roles = client.roles.roles_for_user(user=self.user_foo['id'],
 
835
                                            tenant=self.tenant_bar['id'])
 
836
        self.assertTrue(len(roles) > 0)
 
837
 
 
838
 
 
839
class KcEssex3TestCase(CompatTestCase, KeystoneClientTests):
 
840
    def get_checkout(self):
 
841
        return KEYSTONECLIENT_REPO, 'essex-3'
 
842
 
 
843
    def test_tenant_add_and_remove_user(self):
 
844
        client = self.get_client(admin=True)
 
845
        client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'],
 
846
                                        user_id=self.user_two['id'],
 
847
                                        role_id=self.role_useless['id'])
 
848
        role_refs = client.roles.get_user_role_refs(
 
849
                user_id=self.user_two['id'])
 
850
        self.assert_(self.tenant_baz['id'] in [x.tenantId for x in role_refs])
 
851
 
 
852
        # get the "role_refs" so we get the proper id, this is how the clients
 
853
        # do it
 
854
        roleref_refs = client.roles.get_user_role_refs(
 
855
                user_id=self.user_two['id'])
 
856
        for roleref_ref in roleref_refs:
 
857
            if (roleref_ref.roleId == self.role_useless['id']
 
858
                and roleref_ref.tenantId == self.tenant_baz['id']):
 
859
                # use python's scope fall through to leave roleref_ref set
 
860
                break
 
861
 
 
862
        client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'],
 
863
                                             user_id=self.user_two['id'],
 
864
                                             role_id=roleref_ref.id)
 
865
 
 
866
        role_refs = client.roles.get_user_role_refs(
 
867
                user_id=self.user_two['id'])
 
868
        self.assert_(self.tenant_baz['id'] not in
 
869
                     [x.tenantId for x in role_refs])
 
870
 
 
871
    def test_roles_get_by_user(self):
 
872
        client = self.get_client(admin=True)
 
873
        roles = client.roles.get_user_role_refs(user_id='foo')
 
874
        self.assertTrue(len(roles) > 0)
 
875
 
 
876
    def test_role_list_404(self):
 
877
        raise nose.exc.SkipTest('N/A')
 
878
 
 
879
    def test_authenticate_and_delete_token(self):
 
880
        raise nose.exc.SkipTest('N/A')
 
881
 
 
882
    def test_user_create_update_delete(self):
 
883
        from keystoneclient import exceptions as client_exceptions
 
884
 
 
885
        test_username = 'new_user'
 
886
        client = self.get_client(admin=True)
 
887
        user = client.users.create(name=test_username,
 
888
                                   password='password',
 
889
                                   email='user1@test.com')
 
890
        self.assertEquals(user.name, test_username)
 
891
 
 
892
        user = client.users.get(user=user.id)
 
893
        self.assertEquals(user.name, test_username)
 
894
 
 
895
        user = client.users.update_email(user=user, email='user2@test.com')
 
896
        self.assertEquals(user.email, 'user2@test.com')
 
897
 
 
898
        # NOTE(termie): update_enabled doesn't return anything, probably a bug
 
899
        client.users.update_enabled(user=user, enabled=False)
 
900
        user = client.users.get(user.id)
 
901
        self.assertFalse(user.enabled)
 
902
 
 
903
        self.assertRaises(client_exceptions.Unauthorized,
 
904
                  self._client,
 
905
                  username=test_username,
 
906
                  password='password')
 
907
        client.users.update_enabled(user, True)
 
908
 
 
909
        user = client.users.update_password(user=user, password='password2')
 
910
 
 
911
        self._client(username=test_username,
 
912
                     password='password2')
 
913
 
 
914
        user = client.users.update_tenant(user=user, tenant='bar')
 
915
        # TODO(ja): once keystonelight supports default tenant
 
916
        #           when you login without specifying tenant, the
 
917
        #           token should be scoped to tenant 'bar'
 
918
 
 
919
        client.users.delete(user.id)
 
920
        self.assertRaises(client_exceptions.NotFound, client.users.get,
 
921
                          user.id)
 
922
 
 
923
    def test_user_update_404(self):
 
924
        raise nose.exc.SkipTest('N/A')
 
925
 
 
926
    def test_endpoint_create_404(self):
 
927
        raise nose.exc.SkipTest('N/A')
 
928
 
 
929
    def test_endpoint_delete_404(self):
 
930
        raise nose.exc.SkipTest('N/A')