37
38
conflict, created, etag, forbidden, no_content, not_found, okay, paginate,
39
40
from mailman.rest.preferences import Preferences
40
from mailman.rest.validator import PatchValidator, Validator
41
from mailman.rest.validator import (
42
PatchValidator, Validator, list_of_strings_validator)
41
43
from passlib.utils import generate_password as generate
42
44
from uuid import UUID
43
45
from zope.component import getUtility
47
49
# Attributes of a user which can be changed via the REST API.
48
50
class PasswordEncrypterGetterSetter(GetterSetter):
49
51
def __init__(self):
50
super(PasswordEncrypterGetterSetter, self).__init__(
51
config.password_context.encrypt)
52
super().__init__(config.password_context.encrypt)
52
53
def get(self, obj, attribute):
53
54
assert attribute == 'cleartext_password'
54
super(PasswordEncrypterGetterSetter, self).get(obj, 'password')
55
super().get(obj, 'password')
55
56
def put(self, obj, attribute, value):
56
57
assert attribute == 'cleartext_password'
57
super(PasswordEncrypterGetterSetter, self).put(obj, 'password', value)
58
super().put(obj, 'password', value)
61
class ListOfDomainOwners(GetterSetter):
62
def get(self, domain, attribute):
63
assert attribute == 'owner', (
64
'Unexpected attribute: {}'.format(attribute))
66
return owner.addresses[0].email
67
return sorted(domain.owners, key=sort_key)
69
def put(self, domain, attribute, value):
70
assert attribute == 'owner', (
71
'Unexpected attribute: {}'.format(attribute))
72
domain.add_owners(value)
76
cleartext_password=PasswordEncrypterGetterSetter(),
61
77
display_name=GetterSetter(str),
62
cleartext_password=PasswordEncrypterGetterSetter(),
78
is_server_owner=GetterSetter(as_boolean),
66
82
CREATION_FIELDS = dict(
70
_optional=('display_name', 'password'),
87
_optional=('display_name', 'password', 'is_server_owner'),
78
95
# strip that out (if it exists), then create the user, adding the password
79
96
# after the fact if successful.
80
97
password = arguments.pop('password', None)
98
is_server_owner = arguments.pop('is_server_owner', False)
82
100
user = getUtility(IUserManager).create_user(**arguments)
83
101
except ExistingAddressError as error:
88
106
# This will have to be reset since it cannot be retrieved.
89
107
password = generate(int(config.passwords.password_length))
90
108
user.password = config.password_context.encrypt(password)
109
user.is_server_owner = is_server_owner
91
110
location = path_to('users/{}'.format(user.user_id.int))
92
111
created(response, location)
105
124
# but we serialize its integer equivalent.
106
125
user_id = user.user_id.int
109
127
created_on=user.created_on,
128
is_server_owner=user.is_server_owner,
110
129
self_link=path_to('users/{}'.format(user_id)),
112
132
# Add the password attribute, only if the user has a password. Same
113
133
# with the real name. These could be None or the empty string.
114
134
if user.password:
293
313
del fields['email']
294
314
fields['user_id'] = int
295
315
fields['auto_create'] = as_boolean
296
fields['_optional'] = fields['_optional'] + ('user_id', 'auto_create')
316
fields['_optional'] = fields['_optional'] + (
317
'user_id', 'auto_create', 'is_server_owner')
298
319
validator = Validator(**fields)
299
320
arguments = validator(request)
328
349
# Process post data and check for an existing user.
329
350
fields = CREATION_FIELDS.copy()
330
351
fields['user_id'] = int
331
fields['_optional'] = fields['_optional'] + ('user_id', 'email')
352
fields['_optional'] = fields['_optional'] + (
353
'user_id', 'email', 'is_server_owner')
333
355
validator = Validator(**fields)
334
356
arguments = validator(request)
377
399
no_content(response)
379
401
forbidden(response)
405
class OwnersForDomain(_UserBase):
406
"""Owners for a particular domain."""
408
def __init__(self, domain):
409
self._domain = domain
411
def on_get(self, request, response):
412
"""/domains/<domain>/owners"""
413
if self._domain is None:
416
resource = self._make_collection(request)
417
okay(response, etag(resource))
419
def on_post(self, request, response):
420
"""POST to /domains/<domain>/owners """
421
if self._domain is None:
424
validator = Validator(
425
owner=ListOfDomainOwners(list_of_strings_validator))
427
validator.update(self._domain, request)
428
except ValueError as error:
429
bad_request(response, str(error))
431
return no_content(response)
433
def on_delete(self, request, response):
434
"""DELETE to /domains/<domain>/owners"""
435
if self._domain is None:
440
except ValueError as error:
441
bad_request(response, str(error))
444
owner.addresses[0].email
445
for owner in self._domain.owners
447
for email in owner_email:
448
self._domain.remove_owner(email)
449
return no_content(response)
452
def _get_collection(self, request):
453
"""See `CollectionMixin`."""
454
return list(self._domain.owners)