~ubuntu-branches/ubuntu/quantal/nova/quantal-proposed

« back to all changes in this revision

Viewing changes to nova/tests/test_auth.py

  • Committer: Package Import Robot
  • Author(s): Chuck Short
  • Date: 2012-08-16 14:04:11 UTC
  • mto: This revision was merged to the branch mainline in revision 84.
  • Revision ID: package-import@ubuntu.com-20120816140411-0mr4n241wmk30t9l
Tags: upstream-2012.2~f3
ImportĀ upstreamĀ versionĀ 2012.2~f3

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
 
 
3
 
# Copyright 2010 United States Government as represented by the
4
 
# Administrator of the National Aeronautics and Space Administration.
5
 
# All Rights Reserved.
6
 
#
7
 
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
8
 
#    not use this file except in compliance with the License. You may obtain
9
 
#    a copy of the License at
10
 
#
11
 
#         http://www.apache.org/licenses/LICENSE-2.0
12
 
#
13
 
#    Unless required by applicable law or agreed to in writing, software
14
 
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15
 
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16
 
#    License for the specific language governing permissions and limitations
17
 
#    under the License.
18
 
 
19
 
import unittest
20
 
 
21
 
from nova.auth import fakeldap
22
 
from nova.auth import manager
23
 
from nova import exception
24
 
from nova import flags
25
 
from nova.openstack.common import log as logging
26
 
from nova import test
27
 
 
28
 
FLAGS = flags.FLAGS
29
 
LOG = logging.getLogger(__name__)
30
 
 
31
 
 
32
 
class user_generator(object):
33
 
    def __init__(self, manager, **user_state):
34
 
        if 'name' not in user_state:
35
 
            user_state['name'] = 'test1'
36
 
        self.manager = manager
37
 
        self.user = manager.create_user(**user_state)
38
 
 
39
 
    def __enter__(self):
40
 
        return self.user
41
 
 
42
 
    def __exit__(self, value, type, trace):
43
 
        self.manager.delete_user(self.user)
44
 
 
45
 
 
46
 
class project_generator(object):
47
 
    def __init__(self, manager, **project_state):
48
 
        if 'name' not in project_state:
49
 
            project_state['name'] = 'testproj'
50
 
        if 'manager_user' not in project_state:
51
 
            project_state['manager_user'] = 'test1'
52
 
        self.manager = manager
53
 
        self.project = manager.create_project(**project_state)
54
 
 
55
 
    def __enter__(self):
56
 
        return self.project
57
 
 
58
 
    def __exit__(self, value, type, trace):
59
 
        self.manager.delete_project(self.project)
60
 
 
61
 
 
62
 
class user_and_project_generator(object):
63
 
    def __init__(self, manager, user_state=None, project_state=None):
64
 
        if not user_state:
65
 
            user_state = {}
66
 
        if not project_state:
67
 
            project_state = {}
68
 
 
69
 
        self.manager = manager
70
 
        if 'name' not in user_state:
71
 
            user_state['name'] = 'test1'
72
 
        if 'name' not in project_state:
73
 
            project_state['name'] = 'testproj'
74
 
        if 'manager_user' not in project_state:
75
 
            project_state['manager_user'] = 'test1'
76
 
        self.user = manager.create_user(**user_state)
77
 
        self.project = manager.create_project(**project_state)
78
 
 
79
 
    def __enter__(self):
80
 
        return (self.user, self.project)
81
 
 
82
 
    def __exit__(self, value, type, trace):
83
 
        self.manager.delete_user(self.user)
84
 
        self.manager.delete_project(self.project)
85
 
 
86
 
 
87
 
class _AuthManagerBaseTestCase(test.TestCase):
88
 
 
89
 
    user_not_found_type = exception.UserNotFound
90
 
 
91
 
    def setUp(self):
92
 
        super(_AuthManagerBaseTestCase, self).setUp()
93
 
        self.flags(auth_driver=self.auth_driver,
94
 
                compute_driver='nova.virt.fake.FakeDriver')
95
 
        self.manager = manager.AuthManager(new=True)
96
 
        self.manager.mc.cache = {}
97
 
 
98
 
    def test_create_and_find_user(self):
99
 
        with user_generator(self.manager):
100
 
            self.assert_(self.manager.get_user('test1'))
101
 
 
102
 
    def test_create_and_find_with_properties(self):
103
 
        with user_generator(self.manager, name="herbert", secret="classified",
104
 
                        access="private-party"):
105
 
            u = self.manager.get_user('herbert')
106
 
            self.assertEqual('herbert', u.id)
107
 
            self.assertEqual('herbert', u.name)
108
 
            self.assertEqual('classified', u.secret)
109
 
            self.assertEqual('private-party', u.access)
110
 
 
111
 
    def test_create_user_twice(self):
112
 
        self.manager.create_user('test-1')
113
 
        self.assertRaises(exception.UserExists, self.manager.create_user,
114
 
                          'test-1')
115
 
 
116
 
    def test_signature_is_valid(self):
117
 
        with user_generator(self.manager, name='admin', secret='admin',
118
 
                            access='admin'):
119
 
            with project_generator(self.manager, name="admin",
120
 
                                   manager_user='admin'):
121
 
                accesskey = 'admin:admin'
122
 
                expected_result = (self.manager.get_user('admin'),
123
 
                                   self.manager.get_project('admin'))
124
 
                # captured sig and query string using boto 1.9b/euca2ools 1.2
125
 
                sig = 'd67Wzd9Bwz8xid9QU+lzWXcF2Y3tRicYABPJgrqfrwM='
126
 
                auth_params = {'AWSAccessKeyId': 'admin:admin',
127
 
                               'Action': 'DescribeAvailabilityZones',
128
 
                               'SignatureMethod': 'HmacSHA256',
129
 
                               'SignatureVersion': '2',
130
 
                               'Timestamp': '2011-04-22T11:29:29',
131
 
                               'Version': '2009-11-30'}
132
 
                self.assertTrue(expected_result, self.manager.authenticate(
133
 
                        accesskey,
134
 
                        sig,
135
 
                        auth_params,
136
 
                        'GET',
137
 
                        '127.0.0.1:8773',
138
 
                        '/services/Cloud/'))
139
 
                # captured sig and query string using RightAWS 1.10.0
140
 
                sig = 'ECYLU6xdFG0ZqRVhQybPJQNJ5W4B9n8fGs6+/fuGD2c='
141
 
                auth_params = {'AWSAccessKeyId': 'admin:admin',
142
 
                               'Action': 'DescribeAvailabilityZones',
143
 
                               'SignatureMethod': 'HmacSHA256',
144
 
                               'SignatureVersion': '2',
145
 
                               'Timestamp': '2011-04-22T11:29:49.000Z',
146
 
                               'Version': '2008-12-01'}
147
 
                self.assertTrue(expected_result, self.manager.authenticate(
148
 
                        accesskey,
149
 
                        sig,
150
 
                        auth_params,
151
 
                        'GET',
152
 
                        '127.0.0.1',
153
 
                        '/services/Cloud'))
154
 
 
155
 
    def test_can_get_credentials(self):
156
 
        self.flags(auth_strategy='deprecated')
157
 
        st = {'access': 'access', 'secret': 'secret'}
158
 
        with user_and_project_generator(self.manager, user_state=st) as (u, p):
159
 
            credentials = self.manager.get_environment_rc(u, p)
160
 
            LOG.debug(credentials)
161
 
            self.assertTrue('export EC2_ACCESS_KEY="access:testproj"\n'
162
 
                            in credentials)
163
 
            self.assertTrue('export EC2_SECRET_KEY="secret"\n' in credentials)
164
 
 
165
 
    def test_can_list_users(self):
166
 
        with user_generator(self.manager):
167
 
            with user_generator(self.manager, name="test2"):
168
 
                users = self.manager.get_users()
169
 
                self.assert_(filter(lambda u: u.id == 'test1', users))
170
 
                self.assert_(filter(lambda u: u.id == 'test2', users))
171
 
                self.assert_(not filter(lambda u: u.id == 'test3', users))
172
 
 
173
 
    def test_can_add_and_remove_user_role(self):
174
 
        with user_generator(self.manager):
175
 
            self.assertFalse(self.manager.has_role('test1', 'itsec'))
176
 
            self.manager.add_role('test1', 'itsec')
177
 
            self.assertTrue(self.manager.has_role('test1', 'itsec'))
178
 
            self.manager.remove_role('test1', 'itsec')
179
 
            self.assertFalse(self.manager.has_role('test1', 'itsec'))
180
 
 
181
 
    def test_can_create_and_get_project(self):
182
 
        with user_and_project_generator(self.manager) as (u, p):
183
 
            self.assert_(self.manager.get_user('test1'))
184
 
            self.assert_(self.manager.get_user('test1'))
185
 
            self.assert_(self.manager.get_project('testproj'))
186
 
 
187
 
    def test_can_list_projects(self):
188
 
        with user_and_project_generator(self.manager):
189
 
            with project_generator(self.manager, name="testproj2"):
190
 
                projects = self.manager.get_projects()
191
 
                self.assert_(filter(lambda p: p.name == 'testproj', projects))
192
 
                self.assert_(filter(lambda p: p.name == 'testproj2', projects))
193
 
                self.assert_(not filter(lambda p: p.name == 'testproj3',
194
 
                                        projects))
195
 
 
196
 
    def test_can_create_and_get_project_with_attributes(self):
197
 
        with user_generator(self.manager):
198
 
            with project_generator(self.manager, description='A test project'):
199
 
                project = self.manager.get_project('testproj')
200
 
                self.assertEqual('A test project', project.description)
201
 
 
202
 
    def test_can_create_project_with_manager(self):
203
 
        with user_and_project_generator(self.manager) as (user, project):
204
 
            self.assertEqual('test1', project.project_manager_id)
205
 
            self.assertTrue(self.manager.is_project_manager(user, project))
206
 
 
207
 
    def test_can_create_project_twice(self):
208
 
        with user_and_project_generator(self.manager) as (user1, project):
209
 
            self.assertRaises(exception.ProjectExists,
210
 
                              self.manager.create_project, "testproj", "test1")
211
 
 
212
 
    def test_create_project_assigns_manager_to_members(self):
213
 
        with user_and_project_generator(self.manager) as (user, project):
214
 
            self.assertTrue(self.manager.is_project_member(user, project))
215
 
 
216
 
    def test_create_project_with_manager_and_members(self):
217
 
        with user_generator(self.manager, name='test2') as user2:
218
 
            with user_and_project_generator(self.manager,
219
 
                project_state={'member_users': ['test2']}) as (user1, project):
220
 
                    self.assertTrue(self.manager.is_project_member(
221
 
                        user1, project))
222
 
                    self.assertTrue(self.manager.is_project_member(
223
 
                        user2, project))
224
 
 
225
 
    def test_create_project_with_manager_and_missing_members(self):
226
 
        self.assertRaises(self.user_not_found_type,
227
 
                          self.manager.create_project, "testproj", "test1",
228
 
                          member_users="test2")
229
 
 
230
 
    def test_no_extra_project_members(self):
231
 
        with user_generator(self.manager, name='test2') as baduser:
232
 
            with user_and_project_generator(self.manager) as (user, project):
233
 
                self.assertFalse(self.manager.is_project_member(baduser,
234
 
                                                                 project))
235
 
 
236
 
    def test_no_extra_project_managers(self):
237
 
        with user_generator(self.manager, name='test2') as baduser:
238
 
            with user_and_project_generator(self.manager) as (user, project):
239
 
                self.assertFalse(self.manager.is_project_manager(baduser,
240
 
                                                                 project))
241
 
 
242
 
    def test_can_add_user_to_project(self):
243
 
        with user_generator(self.manager, name='test2') as user:
244
 
            with user_and_project_generator(self.manager) as (_user, project):
245
 
                self.manager.add_to_project(user, project)
246
 
                project = self.manager.get_project('testproj')
247
 
                self.assertTrue(self.manager.is_project_member(user, project))
248
 
 
249
 
    def test_can_remove_user_from_project(self):
250
 
        with user_generator(self.manager, name='test2') as user:
251
 
            with user_and_project_generator(self.manager) as (_user, project):
252
 
                self.manager.add_to_project(user, project)
253
 
                project = self.manager.get_project('testproj')
254
 
                self.assertTrue(self.manager.is_project_member(user, project))
255
 
                self.manager.remove_from_project(user, project)
256
 
                project = self.manager.get_project('testproj')
257
 
                self.assertFalse(self.manager.is_project_member(user, project))
258
 
 
259
 
    def test_can_add_remove_user_with_role(self):
260
 
        with user_generator(self.manager, name='test2') as user:
261
 
            with user_and_project_generator(self.manager) as (_user, project):
262
 
                # NOTE(todd): after modifying users you must reload project
263
 
                self.manager.add_to_project(user, project)
264
 
                project = self.manager.get_project('testproj')
265
 
                self.manager.add_role(user, 'developer', project)
266
 
                self.assertTrue(self.manager.is_project_member(user, project))
267
 
                self.manager.remove_from_project(user, project)
268
 
                project = self.manager.get_project('testproj')
269
 
                self.assertFalse(self.manager.has_role(user, 'developer',
270
 
                                                       project))
271
 
                self.assertFalse(self.manager.is_project_member(user, project))
272
 
 
273
 
    def test_adding_role_to_project_is_ignored_unless_added_to_user(self):
274
 
        with user_and_project_generator(self.manager) as (user, project):
275
 
            self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
276
 
            self.manager.add_role(user, 'sysadmin', project)
277
 
            # NOTE(todd): it will still show up in get_user_roles(u, project)
278
 
            self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
279
 
            self.manager.add_role(user, 'sysadmin')
280
 
            self.assertTrue(self.manager.has_role(user, 'sysadmin', project))
281
 
 
282
 
    def test_add_user_role_doesnt_infect_project_roles(self):
283
 
        with user_and_project_generator(self.manager) as (user, project):
284
 
            self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
285
 
            self.manager.add_role(user, 'sysadmin')
286
 
            self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
287
 
 
288
 
    def test_can_list_user_roles(self):
289
 
        with user_and_project_generator(self.manager) as (user, project):
290
 
            self.manager.add_role(user, 'sysadmin')
291
 
            roles = self.manager.get_user_roles(user)
292
 
            self.assertTrue('sysadmin' in roles)
293
 
            self.assertFalse('netadmin' in roles)
294
 
 
295
 
    def test_can_list_project_roles(self):
296
 
        with user_and_project_generator(self.manager) as (user, project):
297
 
            self.manager.add_role(user, 'sysadmin')
298
 
            self.manager.add_role(user, 'sysadmin', project)
299
 
            self.manager.add_role(user, 'netadmin', project)
300
 
            project_roles = self.manager.get_user_roles(user, project)
301
 
            self.assertTrue('sysadmin' in project_roles)
302
 
            self.assertTrue('netadmin' in project_roles)
303
 
            # has role should be false user-level role is missing
304
 
            self.assertFalse(self.manager.has_role(user, 'netadmin', project))
305
 
 
306
 
    def test_can_remove_user_roles(self):
307
 
        with user_and_project_generator(self.manager) as (user, project):
308
 
            self.manager.add_role(user, 'sysadmin')
309
 
            self.assertTrue(self.manager.has_role(user, 'sysadmin'))
310
 
            self.manager.remove_role(user, 'sysadmin')
311
 
            self.assertFalse(self.manager.has_role(user, 'sysadmin'))
312
 
 
313
 
    def test_removing_user_role_hides_it_from_project(self):
314
 
        with user_and_project_generator(self.manager) as (user, project):
315
 
            self.manager.add_role(user, 'sysadmin')
316
 
            self.manager.add_role(user, 'sysadmin', project)
317
 
            self.assertTrue(self.manager.has_role(user, 'sysadmin', project))
318
 
            self.manager.remove_role(user, 'sysadmin')
319
 
            self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
320
 
 
321
 
    def test_can_remove_project_role_but_keep_user_role(self):
322
 
        with user_and_project_generator(self.manager) as (user, project):
323
 
            self.manager.add_role(user, 'sysadmin')
324
 
            self.manager.add_role(user, 'sysadmin', project)
325
 
            self.assertTrue(self.manager.has_role(user, 'sysadmin'))
326
 
            self.manager.remove_role(user, 'sysadmin', project)
327
 
            self.assertFalse(self.manager.has_role(user, 'sysadmin', project))
328
 
            self.assertTrue(self.manager.has_role(user, 'sysadmin'))
329
 
 
330
 
    def test_can_retrieve_project_by_user(self):
331
 
        with user_and_project_generator(self.manager) as (user, project):
332
 
            self.assertEqual(1, len(self.manager.get_projects('test1')))
333
 
 
334
 
    def test_can_modify_project(self):
335
 
        with user_and_project_generator(self.manager):
336
 
            with user_generator(self.manager, name='test2'):
337
 
                self.manager.modify_project('testproj', 'test2', 'new desc')
338
 
                project = self.manager.get_project('testproj')
339
 
                self.assertEqual('test2', project.project_manager_id)
340
 
                self.assertEqual('new desc', project.description)
341
 
 
342
 
    def test_can_call_modify_project_but_do_nothing(self):
343
 
        with user_and_project_generator(self.manager):
344
 
            self.manager.modify_project('testproj')
345
 
            project = self.manager.get_project('testproj')
346
 
            self.assertEqual('test1', project.project_manager_id)
347
 
            self.assertEqual('testproj', project.description)
348
 
 
349
 
    def test_modify_project_adds_new_manager(self):
350
 
        with user_and_project_generator(self.manager):
351
 
            with user_generator(self.manager, name='test2'):
352
 
                self.manager.modify_project('testproj', 'test2', 'new desc')
353
 
                project = self.manager.get_project('testproj')
354
 
                self.assertTrue('test2' in project.member_ids)
355
 
 
356
 
    def test_create_project_with_missing_user(self):
357
 
        with user_generator(self.manager):
358
 
            self.assertRaises(self.user_not_found_type,
359
 
                              self.manager.create_project, 'testproj',
360
 
                              'not_real')
361
 
 
362
 
    def test_can_delete_project(self):
363
 
        with user_generator(self.manager):
364
 
            self.manager.create_project('testproj', 'test1')
365
 
            self.assert_(self.manager.get_project('testproj'))
366
 
            self.manager.delete_project('testproj')
367
 
            projectlist = self.manager.get_projects()
368
 
            self.assert_(not filter(lambda p: p.name == 'testproj',
369
 
                         projectlist))
370
 
 
371
 
    def test_can_delete_user(self):
372
 
        self.manager.create_user('test1')
373
 
        self.assert_(self.manager.get_user('test1'))
374
 
        self.manager.delete_user('test1')
375
 
        userlist = self.manager.get_users()
376
 
        self.assert_(not filter(lambda u: u.id == 'test1', userlist))
377
 
 
378
 
    def test_can_modify_users(self):
379
 
        with user_generator(self.manager):
380
 
            self.manager.modify_user('test1', 'access', 'secret', True)
381
 
            user = self.manager.get_user('test1')
382
 
            self.assertEqual('access', user.access)
383
 
            self.assertEqual('secret', user.secret)
384
 
            self.assertTrue(user.is_admin())
385
 
 
386
 
    def test_can_call_modify_user_but_do_nothing(self):
387
 
        with user_generator(self.manager):
388
 
            old_user = self.manager.get_user('test1')
389
 
            self.manager.modify_user('test1')
390
 
            user = self.manager.get_user('test1')
391
 
            self.assertEqual(old_user.access, user.access)
392
 
            self.assertEqual(old_user.secret, user.secret)
393
 
            self.assertEqual(old_user.is_admin(), user.is_admin())
394
 
 
395
 
    def test_get_nonexistent_user_raises_notfound_exception(self):
396
 
        self.assertRaises(exception.NotFound,
397
 
                          self.manager.get_user,
398
 
                          'joeblow')
399
 
 
400
 
 
401
 
class AuthManagerLdapTestCase(_AuthManagerBaseTestCase):
402
 
    auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
403
 
    user_not_found_type = exception.LDAPUserNotFound
404
 
 
405
 
    def test_reconnect_on_server_failure(self):
406
 
        self.manager.get_users()
407
 
        fakeldap.server_fail = True
408
 
        try:
409
 
            self.assertRaises(fakeldap.SERVER_DOWN, self.manager.get_users)
410
 
        finally:
411
 
            fakeldap.server_fail = False
412
 
        self.manager.get_users()
413
 
 
414
 
 
415
 
class AuthManagerDbTestCase(_AuthManagerBaseTestCase):
416
 
    auth_driver = 'nova.auth.dbdriver.DbDriver'
417
 
    user_not_found_type = exception.UserNotFound
418
 
 
419
 
 
420
 
if __name__ == "__main__":
421
 
    # TODO(anotherjesse): Implement use_fake as an option
422
 
    unittest.main()