~abompard/mailman/subpolicy

« back to all changes in this revision

Viewing changes to src/mailman/rest/users.py

  • Committer: Barry Warsaw
  • Date: 2015-04-07 02:06:28 UTC
  • mfrom: (7313.2.6 lp1423756)
  • mto: This revision was merged to the branch mainline in revision 7323.
  • Revision ID: barry@list.org-20150407020628-fkwphij7to9lc8gy
 * Domains now have a list of owners, which are ``IUser`` objects, instead of
   the single ``contact_address`` they used to have.  ``IUser`` objects now
   also have a ``is_server_owner`` flag (defaulting to False) to indicate
   whether they have superuser privileges.  Give by Abhliash Raj, with fixes
   and refinements by Barry Warsaw.  (LP: #1423756)

 * Domains can now optionally be created with owners; domain owners can be
   added after the fact; domain owners can be deleted.  Also, users now have
   an ``is_server_owner`` flag as part of their representation, which defaults
   to False, and can be PUT and PATCH'd.  Given by Abhilash Raj, with fixes
   and refinements by Barry Warsaw.  (LP: #1423756)

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
    'AddressUser',
23
23
    'AllUsers',
24
24
    'Login',
 
25
    'OwnersForDomain',
25
26
    ]
26
27
 
27
28
 
37
38
    conflict, created, etag, forbidden, no_content, not_found, okay, paginate,
38
39
    path_to)
39
40
from mailman.rest.preferences import Preferences
40
 
from mailman.rest.validator import PatchValidator, Validator
 
41
from mailman.rest.validator import (
 
42
    PatchValidator, Validator, list_of_strings_validator)
41
43
from passlib.utils import generate_password as generate
42
44
from uuid import UUID
43
45
from zope.component import getUtility
47
49
# Attributes of a user which can be changed via the REST API.
48
50
class PasswordEncrypterGetterSetter(GetterSetter):
49
51
    def __init__(self):
50
 
        super(PasswordEncrypterGetterSetter, self).__init__(
51
 
            config.password_context.encrypt)
 
52
        super().__init__(config.password_context.encrypt)
52
53
    def get(self, obj, attribute):
53
54
        assert attribute == 'cleartext_password'
54
 
        super(PasswordEncrypterGetterSetter, self).get(obj, 'password')
 
55
        super().get(obj, 'password')
55
56
    def put(self, obj, attribute, value):
56
57
        assert attribute == 'cleartext_password'
57
 
        super(PasswordEncrypterGetterSetter, self).put(obj, 'password', value)
 
58
        super().put(obj, 'password', value)
 
59
 
 
60
 
 
61
class ListOfDomainOwners(GetterSetter):
 
62
    def get(self, domain, attribute):
 
63
        assert attribute == 'owner', (
 
64
            'Unexpected attribute: {}'.format(attribute))
 
65
        def sort_key(owner):
 
66
            return owner.addresses[0].email
 
67
        return sorted(domain.owners, key=sort_key)
 
68
 
 
69
    def put(self, domain, attribute, value):
 
70
        assert attribute == 'owner', (
 
71
            'Unexpected attribute: {}'.format(attribute))
 
72
        domain.add_owners(value)
58
73
 
59
74
 
60
75
ATTRIBUTES = dict(
 
76
    cleartext_password=PasswordEncrypterGetterSetter(),
61
77
    display_name=GetterSetter(str),
62
 
    cleartext_password=PasswordEncrypterGetterSetter(),
 
78
    is_server_owner=GetterSetter(as_boolean),
63
79
    )
64
80
 
65
81
 
66
82
CREATION_FIELDS = dict(
 
83
    display_name=str,
67
84
    email=str,
68
 
    display_name=str,
 
85
    is_server_owner=bool,
69
86
    password=str,
70
 
    _optional=('display_name', 'password'),
 
87
    _optional=('display_name', 'password', 'is_server_owner'),
71
88
    )
72
89
 
73
90
 
78
95
    # strip that out (if it exists), then create the user, adding the password
79
96
    # after the fact if successful.
80
97
    password = arguments.pop('password', None)
 
98
    is_server_owner = arguments.pop('is_server_owner', False)
81
99
    try:
82
100
        user = getUtility(IUserManager).create_user(**arguments)
83
101
    except ExistingAddressError as error:
88
106
        # This will have to be reset since it cannot be retrieved.
89
107
        password = generate(int(config.passwords.password_length))
90
108
    user.password = config.password_context.encrypt(password)
 
109
    user.is_server_owner = is_server_owner
91
110
    location = path_to('users/{}'.format(user.user_id.int))
92
111
    created(response, location)
93
112
    return user
105
124
        # but we serialize its integer equivalent.
106
125
        user_id = user.user_id.int
107
126
        resource = dict(
108
 
            user_id=user_id,
109
127
            created_on=user.created_on,
 
128
            is_server_owner=user.is_server_owner,
110
129
            self_link=path_to('users/{}'.format(user_id)),
111
 
            )
 
130
            user_id=user_id,
 
131
        )
112
132
        # Add the password attribute, only if the user has a password.  Same
113
133
        # with the real name.  These could be None or the empty string.
114
134
        if user.password:
293
313
        del fields['email']
294
314
        fields['user_id'] = int
295
315
        fields['auto_create'] = as_boolean
296
 
        fields['_optional'] = fields['_optional'] + ('user_id', 'auto_create')
 
316
        fields['_optional'] = fields['_optional'] + (
 
317
            'user_id', 'auto_create', 'is_server_owner')
297
318
        try:
298
319
            validator = Validator(**fields)
299
320
            arguments = validator(request)
328
349
        # Process post data and check for an existing user.
329
350
        fields = CREATION_FIELDS.copy()
330
351
        fields['user_id'] = int
331
 
        fields['_optional'] = fields['_optional'] + ('user_id', 'email')
 
352
        fields['_optional'] = fields['_optional'] + (
 
353
            'user_id', 'email', 'is_server_owner')
332
354
        try:
333
355
            validator = Validator(**fields)
334
356
            arguments = validator(request)
377
399
            no_content(response)
378
400
        else:
379
401
            forbidden(response)
 
402
 
 
403
 
 
404
 
 
405
class OwnersForDomain(_UserBase):
 
406
    """Owners for a particular domain."""
 
407
 
 
408
    def __init__(self, domain):
 
409
        self._domain = domain
 
410
 
 
411
    def on_get(self, request, response):
 
412
        """/domains/<domain>/owners"""
 
413
        if self._domain is None:
 
414
            not_found(response)
 
415
            return
 
416
        resource = self._make_collection(request)
 
417
        okay(response, etag(resource))
 
418
 
 
419
    def on_post(self, request, response):
 
420
        """POST to /domains/<domain>/owners """
 
421
        if self._domain is None:
 
422
            not_found(response)
 
423
            return
 
424
        validator = Validator(
 
425
            owner=ListOfDomainOwners(list_of_strings_validator))
 
426
        try:
 
427
            validator.update(self._domain, request)
 
428
        except ValueError as error:
 
429
            bad_request(response, str(error))
 
430
            return
 
431
        return no_content(response)
 
432
 
 
433
    def on_delete(self, request, response):
 
434
        """DELETE to /domains/<domain>/owners"""
 
435
        if self._domain is None:
 
436
            not_found(response)
 
437
        try:
 
438
            # No arguments.
 
439
            Validator()(request)
 
440
        except ValueError as error:
 
441
            bad_request(response, str(error))
 
442
            return
 
443
        owner_email = [
 
444
            owner.addresses[0].email
 
445
            for owner in self._domain.owners
 
446
            ]
 
447
        for email in owner_email:
 
448
            self._domain.remove_owner(email)
 
449
        return no_content(response)
 
450
 
 
451
    @paginate
 
452
    def _get_collection(self, request):
 
453
        """See `CollectionMixin`."""
 
454
        return list(self._domain.owners)