~juju-qa/juju-ci-tools/trunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
#!/usr/bin/env python
"""This testsuite is intended to test basic user permissions. Users
   can be granted read or full privileges by model. Revoking those
   privileges should remove them.

   A read permission user can see things such as status and
   perform read-only commands. A write permission user has
   equivalent powers as an admin"""

from __future__ import print_function

import argparse
from collections import namedtuple
import copy
import json
import logging
import random
import string
import subprocess
import sys

import pexpect

from deploy_stack import (
    BootstrapManager,
    )
from utility import (
    JujuAssertionError,
    add_basic_testing_arguments,
    configure_logging,
    temp_dir,
    )

__metaclass__ = type


log = logging.getLogger("assess_user_grant_revoke")

User = namedtuple('User', ['name', 'permissions', 'expect'])


USER_LIST_CTRL = [{"access": "superuser", "user-name": "admin",
                   "display-name": "admin"}]
USER_LIST_CTRL_READ = copy.deepcopy(USER_LIST_CTRL)
# Created user has no display name, bug 1606354
USER_LIST_CTRL_READ.append(
    {"access": "login", "user-name": "readuser"})
USER_LIST_CTRL_WRITE = copy.deepcopy(USER_LIST_CTRL)
# bug 1606354
USER_LIST_CTRL_WRITE.append({"access": "login", "user-name": "writeuser"})
USER_LIST_CTRL_ADMIN = copy.deepcopy(USER_LIST_CTRL)
# bug 1606354
USER_LIST_CTRL_ADMIN.append(
    {"access": "superuser", "user-name": "adminuser"})
SHARE_LIST_CTRL = {"admin": {"display-name": "admin",
                             "access": "admin"}}
SHARE_LIST_CTRL_READ = copy.deepcopy(SHARE_LIST_CTRL)
SHARE_LIST_CTRL_READ["readuser"] = {"access": "read"}
SHARE_LIST_CTRL_WRITE = copy.deepcopy(SHARE_LIST_CTRL)
SHARE_LIST_CTRL_WRITE["writeuser"] = {"access": "write"}
SHARE_LIST_CTRL_ADMIN = copy.deepcopy(SHARE_LIST_CTRL)
SHARE_LIST_CTRL_ADMIN["adminuser"] = {"access": "admin"}


def _generate_random_string():
    # We prefix a letter because usernames must begin with letter
    return 'r'.join(random.choice(
        string.ascii_letters + string.digits) for _ in range(9))


def assert_equal(found, expected):
    found = sorted(found)
    expected = sorted(expected)
    if found != expected:
        raise JujuAssertionError(
            'Found: {}\nExpected: {}'.format(found, expected))


def assert_command_fails(check_callable, command_type, permission):
    try:
        check_callable()
    except subprocess.CalledProcessError:
        pass
    else:
        raise JujuAssertionError(
            'FAIL User performed {} operation with '
            'permission {}'.format(command_type, permission))


def assert_command_succeeds(check_callable, command_type, permission):
    try:
        check_callable()
    except subprocess.CalledProcessError:
        raise JujuAssertionError(
            'FAIL User unable to perform {} operation with '
            'permission {}'.format(command_type, permission))


def list_users(client):
    """Test listing all the users"""
    users_list = json.loads(client.get_juju_output('list-users', '--format',
                                                   'json', include_e=False))
    for user in users_list:
        user.pop("date-created", None)
        user.pop("last-connection", None)
    return users_list


def list_shares(client):
    """Test listing users' shares"""
    model_data = json.loads(
        client.get_juju_output(
            'show-model', '--format', 'json', include_e=False))
    share_list = model_data[client.model_name]['users']
    for key, value in share_list.iteritems():
        value.pop("last-connection", None)
    return share_list


def show_user(client):
    """Test showing a user's status"""
    user_status = json.loads(client.get_juju_output('show-user', '--format',
                                                    'json', include_e=False))
    user_status.pop("date-created", None)
    user_status.pop("last-connection", None)
    return user_status


def assess_read_operations(client, permission, has_permission):
    read_commands = (
        client.show_status,
        lambda: client.juju('show-model', (), include_e=False),
    )

    for command in read_commands:
        if has_permission:
            assert_command_succeeds(command, 'read', permission)
        else:
            assert_command_fails(command, 'read', permission)


def assess_write_operations(client, permission, has_permission):
    tags = '"{}={}"'.format(client.env.user_name, permission)
    write_commands = (
        lambda: client.set_env_option('resource-tags', tags),
    )

    for command in write_commands:
        if has_permission:
            assert_command_succeeds(command, 'write', permission)
        else:
            assert_command_fails(command, 'write', permission)


def assess_admin_operations(client, permission, has_permission):
    # Create a username for the user client to interact with
    new_read_user = _generate_random_string()
    new_admin_user = _generate_random_string()
    admin_commands = (
        lambda: client.add_user(new_read_user),
        lambda: client.grant(new_read_user, permission="read"),
        lambda: client.remove_user(new_read_user),
        lambda: client.add_user_perms(new_admin_user, permissions="admin"),
        lambda: client.remove_user(new_admin_user),
    )

    for command in admin_commands:
        if has_permission:
            assert_command_succeeds(command, 'admin', permission)
        else:
            assert_command_fails(command, 'admin', permission)


def assert_read_model(client, permission, has_permission):
    """Test if the user has or doesn't have the read permission"""
    log.info('Checking read model acl {}'.format(client.env.user_name))
    assess_read_operations(client, permission, has_permission)
    log.info('PASS {} read acl'.format(client.env.user_name))


def assert_write_model(client, permission, has_permission):
    """Test if the user has or doesn't have the write permission"""
    log.info('Checking write model acl {}'.format(client.env.user_name))
    assess_write_operations(client, permission, has_permission)
    log.info('PASS {} write model acl'.format(client.env.user_name))


def assert_admin_model(controller_client, client, permission, has_permission):
    """Test if the user has or doesn't have the admin permission"""
    log.info('Checking admin acl with {}'.format(client.env.user_name))
    assess_admin_operations(client, permission, has_permission)
    log.info('PASS {} admin acl'.format(client.env.user_name))


def assert_user_permissions(user, user_client, controller_client):
    """Test if users' permissions are within expectations"""
    expect = iter(user.expect)
    permission = user.permissions
    assert_read_model(user_client, permission, expect.next())
    assert_write_model(user_client, permission, expect.next())
    assert_admin_model(
        controller_client, user_client, permission, expect.next())

    log.info("Revoking {} permission from {}".format(
        user.permissions, user.name))
    controller_client.revoke(user.name, permissions=user.permissions)
    log.info('Revoke accepted')

    assert_read_model(user_client, permission, expect.next())
    assert_write_model(user_client, permission, expect.next())
    assert_admin_model(
        controller_client, user_client, permission, expect.next())


def assert_change_password(client, user, password):
    """Test changing user's password"""
    log.info('Checking change-user-password')
    try:
        child = client.expect('change-user-password', (user.name,),
                              include_e=False)
        child.expect('(?i)password')
        child.sendline(password)
        child.expect('(?i)password')
        child.sendline(password)
        client._end_pexpect_session(child)
    except pexpect.TIMEOUT:
        log.error('Buffer: {}'.format(child.buffer))
        log.error('Before: {}'.format(child.before))
        raise JujuAssertionError(
            'FAIL Changing user password failed: '
            'pexpect process exited with {}'.format(child.exitstatus))
    log.info('PASS change-user-password')


def assert_disable_enable(controller_client, user):
    """Test disabling and enabling users"""
    original_user_list = list_users(controller_client)
    log.info('Checking disabled {}'.format(user.name))
    controller_client.disable_user(user.name)
    log.info('Disabled {}'.format(user.name))
    user_list = list_users(controller_client)
    log.info('Checking list-users {}'.format(user.name))
    assert_equal(user_list, USER_LIST_CTRL)
    log.info('Checking enable {}'.format(user.name))
    controller_client.enable_user(user.name)
    log.info('Enabled {}'.format(user.name))
    user_list = list_users(controller_client)
    log.info('Checking list-users {}'.format(user.name))
    assert_equal(user_list, original_user_list)


def assert_user_status(client, user, expected_users, expected_shares):
    """Test listing users and shares against expected values"""
    log.info('Checking list-users {}'.format(user.name))
    user_list = list_users(client)
    assert_equal(user_list, expected_users)
    log.info('Checking list-shares {}'.format(user.name))
    share_list = list_shares(client)
    assert_equal(share_list, expected_shares)


def assert_logout_login(controller_client, user_client, user,
                        fake_home, password, expected_users):
    """Test users' login and logout"""
    original_user_list = list_users(controller_client)
    user_client.logout()
    log.info('Checking list-users after logout')
    user_list = list_users(controller_client)
    assert_equal(user_list, expected_users)
    log.info('Checking list-users after login')
    user_client.login_user(user.name, password)
    user_list = list_users(controller_client)
    assert_equal(user_list, original_user_list)


def assert_read_user(controller_client, user):
    """Assess the operations of read user"""
    log.info('Checking read {}'.format(user.name))
    with temp_dir() as fake_home:
        user_client = controller_client.register_user(
            user, fake_home)
        user_client.env.user_name = user.name
        assert_user_status(controller_client, user,
                           USER_LIST_CTRL_READ, SHARE_LIST_CTRL_READ)

        password = _generate_random_string()
        assert_change_password(user_client, user, password)
        assert_logout_login(controller_client, user_client,
                            user, fake_home, password, USER_LIST_CTRL_READ)
        assert_user_permissions(user, user_client, controller_client)
        assert_disable_enable(controller_client, user)
        controller_client.remove_user(user.name)
    log.info('PASS read {}'.format(user.name))


def assert_write_user(controller_client, user):
    """Assess the operations of write user"""
    log.info('Checking write {}'.format(user.name))
    with temp_dir() as fake_home:
        user_client = controller_client.register_user(
            user, fake_home)
        user_client.env.user_name = user.name
        assert_user_status(controller_client, user,
                           USER_LIST_CTRL_WRITE, SHARE_LIST_CTRL_WRITE)

        password = _generate_random_string()
        assert_change_password(user_client, user, password)
        assert_logout_login(controller_client, user_client,
                            user, fake_home, password, USER_LIST_CTRL_WRITE)
        assert_user_permissions(user, user_client, controller_client)
        assert_disable_enable(controller_client, user)
        controller_client.remove_user(user.name)
    log.info('PASS write {}'.format(user.name))


def assert_admin_user(controller_client, user):
    """Assess the operations of admin user"""
    log.info('Checking admin {}'.format(user.name))
    with temp_dir() as fake_home:
        user_client = controller_client.register_user(
            user, fake_home)
        controller_client.grant(user_name=user.name, permission="superuser")
        user_client.env.user_name = user.name
        assert_user_status(controller_client, user,
                           USER_LIST_CTRL_ADMIN, SHARE_LIST_CTRL_ADMIN)

        password = _generate_random_string()
        assert_change_password(user_client, user, password)
        assert_logout_login(controller_client, user_client,
                            user, fake_home, password, USER_LIST_CTRL_ADMIN)
        assert_user_permissions(user, user_client, controller_client)
        assert_disable_enable(controller_client, user)
        controller_client.remove_user(user.name)
    log.info('PASS admin {}'.format(user.name))


def assert_controller(controller_client):
    log.info('Checking list-users admin')
    user_list = list_users(controller_client)
    assert_equal(user_list, USER_LIST_CTRL)

    log.info('Checking list-shares admin')
    share_list = list_shares(controller_client)
    assert_equal(share_list, SHARE_LIST_CTRL)

    log.info('Checking show-user admin')
    user_status = show_user(controller_client)
    assert_equal(user_status, USER_LIST_CTRL[0])


def assess_user_grant_revoke(controller_client):
    """Test multi-users functionality"""
    log.info('STARTING grant/revoke permissions')
    controller_client.env.user_name = 'admin'
    log.info("Creating Users: readuser, writeuser, adminuser")
    read_user = User('readuser', 'read',
                     [True, False, False, False, False, False])
    write_user = User('writeuser', 'write',
                      [True, True, False, True, False, False])
    admin_user = User('adminuser', 'admin',
                      [True, True, True, True, True, True])

    # check controller client
    assert_controller(controller_client)

    # check each type of user
    assert_read_user(controller_client, read_user)
    assert_write_user(controller_client, write_user)
    assert_admin_user(controller_client, admin_user)

    log.info('SUCCESS grant/revoke permissions')


def parse_args(argv):
    """Parse all arguments."""
    parser = argparse.ArgumentParser(
        description="Test grant and revoke permissions for users")
    add_basic_testing_arguments(parser)
    return parser.parse_args(argv)


def main(argv=None):
    args = parse_args(argv)
    configure_logging(logging.DEBUG)
    bs_manager = BootstrapManager.from_args(args)
    with bs_manager.booted_context(args.upload_tools):
        assess_user_grant_revoke(bs_manager.client)
    return 0

if __name__ == '__main__':
    sys.exit(main())