~andrewjbeach/juju-ci-tools/make-local-patcher

« back to all changes in this revision

Viewing changes to assess_user_grant_revoke.py

Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#!/usr/bin/env python
2
 
"""This testsuite is intended to test basic user permissions. Users
3
 
   can be granted read or full privileges by model. Revoking those
4
 
   privileges should remove them.
5
 
 
6
 
   A read permission user can see things such as status and
7
 
   perform read-only commands. A write permission user has
8
 
   equivalent powers as an admin"""
9
 
 
10
 
from __future__ import print_function
11
 
 
12
 
import argparse
13
 
from collections import namedtuple
14
 
import copy
15
 
import json
16
 
import logging
17
 
import random
18
 
import string
19
 
import subprocess
20
 
import sys
21
 
 
22
 
import pexpect
23
 
 
24
 
from deploy_stack import (
25
 
    BootstrapManager,
26
 
    )
27
 
from utility import (
28
 
    JujuAssertionError,
29
 
    add_basic_testing_arguments,
30
 
    configure_logging,
31
 
    temp_dir,
32
 
    )
33
 
 
34
 
__metaclass__ = type
35
 
 
36
 
 
37
 
log = logging.getLogger("assess_user_grant_revoke")
38
 
 
39
 
User = namedtuple('User', ['name', 'permissions', 'expect'])
40
 
 
41
 
 
42
 
USER_LIST_CTRL = [{"access": "superuser", "user-name": "admin",
43
 
                   "display-name": "admin"}]
44
 
USER_LIST_CTRL_READ = copy.deepcopy(USER_LIST_CTRL)
45
 
# Created user has no display name, bug 1606354
46
 
USER_LIST_CTRL_READ.append(
47
 
    {"access": "login", "user-name": "readuser"})
48
 
USER_LIST_CTRL_WRITE = copy.deepcopy(USER_LIST_CTRL)
49
 
# bug 1606354
50
 
USER_LIST_CTRL_WRITE.append({"access": "login", "user-name": "writeuser"})
51
 
USER_LIST_CTRL_ADMIN = copy.deepcopy(USER_LIST_CTRL)
52
 
# bug 1606354
53
 
USER_LIST_CTRL_ADMIN.append(
54
 
    {"access": "superuser", "user-name": "adminuser"})
55
 
SHARE_LIST_CTRL = {"admin@local": {"display-name": "admin",
56
 
                                   "access": "admin"}}
57
 
SHARE_LIST_CTRL_READ = copy.deepcopy(SHARE_LIST_CTRL)
58
 
SHARE_LIST_CTRL_READ["readuser@local"] = {"access": "read"}
59
 
SHARE_LIST_CTRL_WRITE = copy.deepcopy(SHARE_LIST_CTRL)
60
 
SHARE_LIST_CTRL_WRITE["writeuser@local"] = {"access": "write"}
61
 
SHARE_LIST_CTRL_ADMIN = copy.deepcopy(SHARE_LIST_CTRL)
62
 
SHARE_LIST_CTRL_ADMIN["adminuser@local"] = {"access": "admin"}
63
 
 
64
 
 
65
 
def assert_equal(found, expected):
66
 
    found = sorted(found)
67
 
    expected = sorted(expected)
68
 
    if found != expected:
69
 
        raise JujuAssertionError(
70
 
            'Found: {}\nExpected: {}'.format(found, expected))
71
 
 
72
 
 
73
 
def list_users(client):
74
 
    """Test listing all the users"""
75
 
    users_list = json.loads(client.get_juju_output('list-users', '--format',
76
 
                                                   'json', include_e=False))
77
 
    for user in users_list:
78
 
        user.pop("date-created", None)
79
 
        user.pop("last-connection", None)
80
 
    return users_list
81
 
 
82
 
 
83
 
def list_shares(client):
84
 
    """Test listing users' shares"""
85
 
    model_data = json.loads(
86
 
        client.get_juju_output(
87
 
            'show-model', '--format', 'json', include_e=False))
88
 
    share_list = model_data[client.model_name]['users']
89
 
    for key, value in share_list.iteritems():
90
 
        value.pop("last-connection", None)
91
 
    return share_list
92
 
 
93
 
 
94
 
def show_user(client):
95
 
    """Test showing a user's status"""
96
 
    user_status = json.loads(client.get_juju_output('show-user', '--format',
97
 
                                                    'json', include_e=False))
98
 
    user_status.pop("date-created", None)
99
 
    user_status.pop("last-connection", None)
100
 
    return user_status
101
 
 
102
 
 
103
 
def assert_read_model(client, permission, has_permission):
104
 
    """Test if the user has or doesn't have the read permission"""
105
 
    log.info('Checking read model acl {}'.format(client.env.user_name))
106
 
    if has_permission:
107
 
        try:
108
 
            client.show_status()
109
 
        except subprocess.CalledProcessError:
110
 
            raise JujuAssertionError(
111
 
                'FAIL User could not check status with {} permission'.format(
112
 
                    permission))
113
 
    else:
114
 
        try:
115
 
            client.show_status()
116
 
        except subprocess.CalledProcessError:
117
 
            pass
118
 
        else:
119
 
            raise JujuAssertionError(
120
 
                'FAIL {} checked status without {} permission'.format(
121
 
                    client.env.user_name, permission))
122
 
    log.info('PASS {} read acl'.format(client.env.user_name))
123
 
 
124
 
 
125
 
def assert_write_model(client, permission, has_permission):
126
 
    """Test if the user has or doesn't have the write permission"""
127
 
    log.info('Checking write model acl {}'.format(client.env.user_name))
128
 
    if has_permission:
129
 
        try:
130
 
            tags = '"{}={}"'.format(client.env.user_name, permission)
131
 
            client.set_env_option('resource-tags', tags)
132
 
        except subprocess.CalledProcessError:
133
 
            raise JujuAssertionError(
134
 
                'FAIL {} could not set-model-config with {} permission'.format(
135
 
                    client.env.user_name, permission))
136
 
    else:
137
 
        try:
138
 
            tags = '"{}=no-{}"'.format(client.env.user_name, permission)
139
 
            client.set_env_option('resource-tags', tags)
140
 
        except subprocess.CalledProcessError:
141
 
            pass
142
 
        else:
143
 
            raise JujuAssertionError(
144
 
                'FAIL User set model-config without {} permission'.format(
145
 
                    permission))
146
 
    log.info('PASS {} write model acl'.format(client.env.user_name))
147
 
 
148
 
 
149
 
def assert_admin_model(controller_client, client, permission, has_permission):
150
 
    """Test if the user has or doesn't have the admin permission"""
151
 
    log.info('Checking admin acl with {}'.format(client.env.user_name))
152
 
    code = ''.join(random.choice(
153
 
        string.ascii_letters + string.digits) for _ in xrange(4))
154
 
    new_user = permission + code
155
 
    log.info('Adding user {} for test'.format(new_user))
156
 
    controller_client.add_user_perms(new_user, permissions="read")
157
 
    if has_permission:
158
 
        try:
159
 
            client.grant(new_user, permission="write")
160
 
        except subprocess.CalledProcessError:
161
 
            raise JujuAssertionError(
162
 
                'FAIL {} could not grant write acl to user'.format(
163
 
                    client.env.user_name, permission))
164
 
    else:
165
 
        try:
166
 
            client.grant(new_user, permission="write")
167
 
        except subprocess.CalledProcessError:
168
 
            log.info('Correctly rejected {} use of grant'.format(
169
 
                client.env.user_name))
170
 
        else:
171
 
            raise JujuAssertionError(
172
 
                'FAIL {} granted access without {} permission'.format(
173
 
                    client.env.user_name, permission))
174
 
    # Remove the user to ensure list-users is sane.
175
 
    log.info('Removing user {} after test'.format(new_user))
176
 
    controller_client.remove_user(new_user)
177
 
    log.info('PASS {} admin acl'.format(client.env.user_name))
178
 
 
179
 
 
180
 
def assert_user_permissions(user, user_client, controller_client):
181
 
    """Test if users' permissions are within expectations"""
182
 
    expect = iter(user.expect)
183
 
    permission = user.permissions
184
 
    assert_read_model(user_client, permission, expect.next())
185
 
    assert_write_model(user_client, permission, expect.next())
186
 
    assert_admin_model(
187
 
        controller_client, user_client, permission, expect.next())
188
 
 
189
 
    log.info("Revoking {} permission from {}".format(
190
 
        user.permissions, user.name))
191
 
    controller_client.revoke(user.name, permissions=user.permissions)
192
 
    log.info('Revoke accepted')
193
 
 
194
 
    assert_read_model(user_client, permission, expect.next())
195
 
    assert_write_model(user_client, permission, expect.next())
196
 
    assert_admin_model(
197
 
        controller_client, user_client, permission, expect.next())
198
 
 
199
 
 
200
 
def assert_change_password(client, user):
201
 
    """Test changing user's password"""
202
 
    log.info('Checking change-user-password')
203
 
    try:
204
 
        child = client.expect('change-user-password', (user.name,),
205
 
                              include_e=False)
206
 
        child.expect('(?i)password')
207
 
        child.sendline(user.name + '_password_2')
208
 
        child.expect('(?i)password')
209
 
        child.sendline(user.name + '_password_2')
210
 
        child.expect(pexpect.EOF)
211
 
    except pexpect.TIMEOUT:
212
 
        raise JujuAssertionError(
213
 
            'FAIL Changing user password failed: pexpect session timed out')
214
 
    if child.isalive():
215
 
        raise JujuAssertionError(
216
 
            'FAIL Changing user password failed: pexpect session still alive')
217
 
    child.close()
218
 
    if child.exitstatus != 0:
219
 
        raise JujuAssertionError(
220
 
            'FAIL Changing user password failed: '
221
 
            'pexpect process exited with {}'.format(child.exitstatus))
222
 
    log.info('PASS change-user-password')
223
 
 
224
 
 
225
 
def assert_disable_enable(controller_client, user):
226
 
    """Test disabling and enabling users"""
227
 
    log.info('Checking disabled {}'.format(user.name))
228
 
    controller_client.disable_user(user.name)
229
 
    log.info('Disabled {}'.format(user.name))
230
 
    user_list = list_users(controller_client)
231
 
    log.info('Checking list-users {}'.format(user.name))
232
 
    assert_equal(user_list, USER_LIST_CTRL)
233
 
    log.info('Checking enable {}'.format(user.name))
234
 
    controller_client.enable_user(user.name)
235
 
    log.info('Enabled {}'.format(user.name))
236
 
    user_list = list_users(controller_client)
237
 
    log.info('Checking list-users {}'.format(user.name))
238
 
    assert_equal(user_list, USER_LIST_CTRL_WRITE)
239
 
 
240
 
 
241
 
def assert_logout_login(controller_client, user_client, user, fake_home):
242
 
    """Test users' login and logout"""
243
 
    user_client.logout()
244
 
    log.info('Checking list-users after logout')
245
 
    user_list = list_users(controller_client)
246
 
    assert_equal(user_list, USER_LIST_CTRL_READ)
247
 
    log.info('Checking list-users after login')
248
 
    username = user.name
249
 
    controller_name = '{}_controller'.format(username)
250
 
    client = controller_client.create_cloned_environment(
251
 
        fake_home, controller_name, user.name)
252
 
    if client.env.config['type'] == 'lxd':
253
 
        client.juju(
254
 
            'login', (user.name, '-c', controller_name), include_e=False)
255
 
    else:
256
 
        try:
257
 
            child = client.expect('login', (user.name, '-c', controller_name),
258
 
                                  include_e=False)
259
 
            # This scenario is pre-macaroon.
260
 
            # See https://bugs.launchpad.net/bugs/1621532
261
 
            child.expect('(?i)password')
262
 
            child.sendline(user.name + '_password_2')
263
 
            # end non-macaroon.
264
 
            child.expect(pexpect.EOF)
265
 
            if child.isalive():
266
 
                raise JujuAssertionError(
267
 
                    'FAIL Login user: pexpect session still alive')
268
 
            child.close()
269
 
            if child.exitstatus != 0:
270
 
                raise JujuAssertionError(
271
 
                    'FAIL Login user: pexpect process exited with {}'.format(
272
 
                        child.exitstatus))
273
 
        except pexpect.TIMEOUT:
274
 
            raise JujuAssertionError(
275
 
                'FAIL Login user failed: pexpect session timed out')
276
 
    log.info('PASS logout and login')
277
 
    return client
278
 
 
279
 
 
280
 
def assert_read_user(controller_client, user):
281
 
    """Assess the operations of read user"""
282
 
    log.info('Checking read {}'.format(user.name))
283
 
    with temp_dir() as fake_home:
284
 
        user_client = controller_client.register_user(
285
 
            user, fake_home)
286
 
        log.info('Checking list-users {}'.format(user.name))
287
 
        user_list = list_users(controller_client)
288
 
        assert_equal(user_list, USER_LIST_CTRL_READ)
289
 
        log.info('Checking list-shares {}'.format(user.name))
290
 
        share_list = list_shares(controller_client)
291
 
        assert_equal(share_list, SHARE_LIST_CTRL_READ)
292
 
        assert_change_password(user_client, user)
293
 
        user_client = assert_logout_login(
294
 
            controller_client, user_client, user, fake_home)
295
 
        assert_user_permissions(user, user_client, controller_client)
296
 
        controller_client.remove_user(user.name)
297
 
    log.info('PASS read {}'.format(user.name))
298
 
 
299
 
 
300
 
def assert_write_user(controller_client, user):
301
 
    """Assess the operations of write user"""
302
 
    log.info('Checking write {}'.format(user.name))
303
 
    with temp_dir() as fake_home:
304
 
        user_client = controller_client.register_user(
305
 
            user, fake_home)
306
 
        user_client.env.user_name = user.name
307
 
        log.info('Checking list-users {}'.format(user.name))
308
 
        user_list = list_users(controller_client)
309
 
        assert_equal(user_list, USER_LIST_CTRL_WRITE)
310
 
        log.info('Checking list-shares {}'.format(user.name))
311
 
        share_list = list_shares(controller_client)
312
 
        assert_equal(share_list, SHARE_LIST_CTRL_WRITE)
313
 
        assert_disable_enable(controller_client, user)
314
 
        assert_user_permissions(user, user_client, controller_client)
315
 
        controller_client.remove_user(user.name)
316
 
    log.info('PASS write {}'.format(user.name))
317
 
 
318
 
 
319
 
def assert_admin_user(controller_client, user):
320
 
    """Assess the operations of admin user"""
321
 
    log.info('Checking admin {}'.format(user.name))
322
 
    with temp_dir() as fake_home:
323
 
        user_client = controller_client.register_user(
324
 
            user, fake_home)
325
 
        controller_client.grant(user_name=user.name, permission="superuser")
326
 
        user_client.env.user_name = user.name
327
 
        log.info('Checking list-users {}'.format(user.name))
328
 
        user_list = list_users(controller_client)
329
 
        assert_equal(user_list, USER_LIST_CTRL_ADMIN)
330
 
        log.info('Checking list-shares {}'.format(user.name))
331
 
        share_list = list_shares(controller_client)
332
 
        assert_equal(share_list, SHARE_LIST_CTRL_ADMIN)
333
 
        assert_user_permissions(user, user_client, controller_client)
334
 
        controller_client.remove_user(user.name)
335
 
    log.info('PASS admin {}'.format(user.name))
336
 
 
337
 
 
338
 
def assess_user_grant_revoke(controller_client):
339
 
    """Test multi-users functionality"""
340
 
    log.info('STARTING grant/revoke permissions')
341
 
    controller_client.env.user_name = 'admin'
342
 
    log.info("Creating Users: readuser, writeuser, adminuser")
343
 
    read_user = User('readuser', 'read',
344
 
                     [True, False, False, False, False, False])
345
 
    write_user = User('writeuser', 'write',
346
 
                      [True, True, False, True, False, False])
347
 
    admin_user = User('adminuser', 'admin',
348
 
                      [True, True, True, True, True, True])
349
 
    log.info('Checking list-users admin')
350
 
    user_list = list_users(controller_client)
351
 
    assert_equal(user_list, USER_LIST_CTRL)
352
 
    log.info('Checking list-shares admin')
353
 
    share_list = list_shares(controller_client)
354
 
    assert_equal(share_list, SHARE_LIST_CTRL)
355
 
    log.info('Checking show-user admin')
356
 
    user_status = show_user(controller_client)
357
 
    assert_equal(user_status, USER_LIST_CTRL[0])
358
 
    assert_read_user(controller_client, read_user)
359
 
    assert_write_user(controller_client, write_user)
360
 
    assert_admin_user(controller_client, admin_user)
361
 
    log.info('SUCCESS grant/revoke permissions')
362
 
 
363
 
 
364
 
def parse_args(argv):
365
 
    """Parse all arguments."""
366
 
    parser = argparse.ArgumentParser(
367
 
        description="Test grant and revoke permissions for users")
368
 
    add_basic_testing_arguments(parser)
369
 
    return parser.parse_args(argv)
370
 
 
371
 
 
372
 
def main(argv=None):
373
 
    args = parse_args(argv)
374
 
    configure_logging(logging.DEBUG)
375
 
    bs_manager = BootstrapManager.from_args(args)
376
 
    with bs_manager.booted_context(args.upload_tools):
377
 
        assess_user_grant_revoke(bs_manager.client)
378
 
    return 0
379
 
 
380
 
if __name__ == '__main__':
381
 
    sys.exit(main())