~mailman-coders/mailman/3.0

« back to all changes in this revision

Viewing changes to src/mailman/app/tests/test_registrar.py

  • Committer: Barry Warsaw
  • Date: 2015-04-15 14:05:35 UTC
  • mfrom: (7313.1.28 subpolicy-2)
  • Revision ID: barry@list.org-20150415140535-csj5cobfg9ocf0ry
 * Mailing list subscription policy work flow has been completely rewritten.
   It now properly supports email verification and subscription confirmation
   by the user, and approval by the moderator using unique tokens.
   ``IMailingList`` objects now have a ``subscription_policy`` attribute.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
"""Test email address registration."""
19
19
 
20
20
__all__ = [
21
 
    'TestEmailValidation',
22
 
    'TestRegistration',
 
21
    'TestRegistrar',
23
22
    ]
24
23
 
25
24
 
26
25
import unittest
27
26
 
28
27
from mailman.app.lifecycle import create_list
29
 
from mailman.interfaces.address import InvalidEmailAddressError
 
28
from mailman.interfaces.mailinglist import SubscriptionPolicy
30
29
from mailman.interfaces.pending import IPendings
31
 
from mailman.interfaces.registrar import ConfirmationNeededEvent, IRegistrar
32
 
from mailman.testing.helpers import event_subscribers
 
30
from mailman.interfaces.registrar import IRegistrar
 
31
from mailman.interfaces.usermanager import IUserManager
33
32
from mailman.testing.layers import ConfigLayer
 
33
from mailman.utilities.datetime import now
34
34
from zope.component import getUtility
35
35
 
36
36
 
37
37
 
38
 
class TestEmailValidation(unittest.TestCase):
39
 
    """Test basic email validation."""
40
 
 
41
 
    layer = ConfigLayer
42
 
 
43
 
    def setUp(self):
44
 
        self.registrar = getUtility(IRegistrar)
45
 
        self.mlist = create_list('alpha@example.com')
46
 
 
47
 
    def test_empty_string_is_invalid(self):
48
 
        self.assertRaises(InvalidEmailAddressError,
49
 
                          self.registrar.register, self.mlist,
50
 
                          '')
51
 
 
52
 
    def test_no_spaces_allowed(self):
53
 
        self.assertRaises(InvalidEmailAddressError,
54
 
                          self.registrar.register, self.mlist,
55
 
                          'some name@example.com')
56
 
 
57
 
    def test_no_angle_brackets(self):
58
 
        self.assertRaises(InvalidEmailAddressError,
59
 
                          self.registrar.register, self.mlist,
60
 
                          '<script>@example.com')
61
 
 
62
 
    def test_ascii_only(self):
63
 
        self.assertRaises(InvalidEmailAddressError,
64
 
                          self.registrar.register, self.mlist,
65
 
                          '\xa0@example.com')
66
 
 
67
 
    def test_domain_required(self):
68
 
        self.assertRaises(InvalidEmailAddressError,
69
 
                          self.registrar.register, self.mlist,
70
 
                          'noatsign')
71
 
 
72
 
    def test_full_domain_required(self):
73
 
        self.assertRaises(InvalidEmailAddressError,
74
 
                          self.registrar.register, self.mlist,
75
 
                          'nodom@ain')
76
 
 
77
 
 
78
 
 
79
 
class TestRegistration(unittest.TestCase):
 
38
class TestRegistrar(unittest.TestCase):
80
39
    """Test registration."""
81
40
 
82
41
    layer = ConfigLayer
83
42
 
84
43
    def setUp(self):
85
 
        self.registrar = getUtility(IRegistrar)
86
 
        self.mlist = create_list('alpha@example.com')
87
 
 
88
 
    def test_confirmation_event_received(self):
89
 
        # Registering an email address generates an event.
90
 
        def capture_event(event):
91
 
            self.assertIsInstance(event, ConfirmationNeededEvent)
92
 
        with event_subscribers(capture_event):
93
 
            self.registrar.register(self.mlist, 'anne@example.com')
94
 
 
95
 
    def test_event_mlist(self):
96
 
        # The event has a reference to the mailing list being subscribed to.
97
 
        def capture_event(event):
98
 
            self.assertIs(event.mlist, self.mlist)
99
 
        with event_subscribers(capture_event):
100
 
            self.registrar.register(self.mlist, 'anne@example.com')
101
 
 
102
 
    def test_event_pendable(self):
103
 
        # The event has an IPendable which contains additional information.
104
 
        def capture_event(event):
105
 
            pendable = event.pendable
106
 
            self.assertEqual(pendable['type'], 'registration')
107
 
            self.assertEqual(pendable['email'], 'anne@example.com')
108
 
            # The key is present, but the value is None.
109
 
            self.assertIsNone(pendable['display_name'])
110
 
            # The default is regular delivery.
111
 
            self.assertEqual(pendable['delivery_mode'], 'regular')
112
 
            self.assertEqual(pendable['list_id'], 'alpha.example.com')
113
 
        with event_subscribers(capture_event):
114
 
            self.registrar.register(self.mlist, 'anne@example.com')
115
 
 
116
 
    def test_token(self):
117
 
        # Registering the email address returns a token, and this token links
118
 
        # back to the pendable.
119
 
        captured_events = []
120
 
        def capture_event(event):
121
 
            captured_events.append(event)
122
 
        with event_subscribers(capture_event):
123
 
            token = self.registrar.register(self.mlist, 'anne@example.com')
124
 
        self.assertEqual(len(captured_events), 1)
125
 
        event = captured_events[0]
126
 
        self.assertEqual(event.token, token)
127
 
        pending = getUtility(IPendings).confirm(token)
128
 
        self.assertEqual(pending, event.pendable)
 
44
        self._mlist = create_list('ant@example.com')
 
45
        self._registrar = IRegistrar(self._mlist)
 
46
        self._pendings = getUtility(IPendings)
 
47
        self._anne = getUtility(IUserManager).create_address(
 
48
            'anne@example.com')
 
49
 
 
50
    def test_unique_token(self):
 
51
        # Registering a subscription request provides a unique token associated
 
52
        # with a pendable.
 
53
        self.assertEqual(self._pendings.count, 0)
 
54
        token = self._registrar.register(self._anne)
 
55
        self.assertIsNotNone(token)
 
56
        self.assertEqual(self._pendings.count, 1)
 
57
        record = self._pendings.confirm(token, expunge=False)
 
58
        self.assertEqual(record['list_id'], self._mlist.list_id)
 
59
        self.assertEqual(record['address'], 'anne@example.com')
 
60
 
 
61
    def test_no_token(self):
 
62
        # Registering a subscription request where no confirmation or
 
63
        # moderation steps are needed, leaves us with no token, since there's
 
64
        # nothing more to do.
 
65
        self._mlist.subscription_policy = SubscriptionPolicy.open
 
66
        self._anne.verified_on = now()
 
67
        token = self._registrar.register(self._anne)
 
68
        self.assertIsNone(token)
 
69
        record = self._pendings.confirm(token, expunge=False)
 
70
        self.assertIsNone(record)
 
71
 
 
72
    def test_is_subscribed(self):
 
73
        # Where no confirmation or moderation steps are needed, registration
 
74
        # happens immediately.
 
75
        self._mlist.subscription_policy = SubscriptionPolicy.open
 
76
        self._anne.verified_on = now()
 
77
        status = self._registrar.register(self._anne)
 
78
        self.assertIsNone(status)
 
79
        member = self._mlist.regular_members.get_member('anne@example.com')
 
80
        self.assertEqual(member.address, self._anne)
 
81
 
 
82
    def test_no_such_token(self):
 
83
        # Given a token which is not in the database, a LookupError is raised.
 
84
        self._registrar.register(self._anne)
 
85
        self.assertRaises(LookupError, self._registrar.confirm, 'not-a-token')
 
86
 
 
87
    def test_confirm_because_verify(self):
 
88
        # We have a subscription request which requires the user to confirm
 
89
        # (because she does not have a verified address), but not the moderator
 
90
        # to approve.  Running the workflow gives us a token.  Confirming the
 
91
        # token subscribes the user.
 
92
        self._mlist.subscription_policy = SubscriptionPolicy.open
 
93
        token = self._registrar.register(self._anne)
 
94
        self.assertIsNotNone(token)
 
95
        member = self._mlist.regular_members.get_member('anne@example.com')
 
96
        self.assertIsNone(member)
 
97
        # Now confirm the subscription.
 
98
        self._registrar.confirm(token)
 
99
        member = self._mlist.regular_members.get_member('anne@example.com')
 
100
        self.assertEqual(member.address, self._anne)
 
101
 
 
102
    def test_confirm_because_confirm(self):
 
103
        # We have a subscription request which requires the user to confirm
 
104
        # (because of list policy), but not the moderator to approve.  Running
 
105
        # the workflow gives us a token.  Confirming the token subscribes the
 
106
        # user.
 
107
        self._mlist.subscription_policy = SubscriptionPolicy.confirm
 
108
        self._anne.verified_on = now()
 
109
        token = self._registrar.register(self._anne)
 
110
        self.assertIsNotNone(token)
 
111
        member = self._mlist.regular_members.get_member('anne@example.com')
 
112
        self.assertIsNone(member)
 
113
        # Now confirm the subscription.
 
114
        self._registrar.confirm(token)
 
115
        member = self._mlist.regular_members.get_member('anne@example.com')
 
116
        self.assertEqual(member.address, self._anne)
 
117
 
 
118
    def test_confirm_because_moderation(self):
 
119
        # We have a subscription request which requires the moderator to
 
120
        # approve.  Running the workflow gives us a token.  Confirming the
 
121
        # token subscribes the user.
 
122
        self._mlist.subscription_policy = SubscriptionPolicy.moderate
 
123
        self._anne.verified_on = now()
 
124
        token = self._registrar.register(self._anne)
 
125
        self.assertIsNotNone(token)
 
126
        member = self._mlist.regular_members.get_member('anne@example.com')
 
127
        self.assertIsNone(member)
 
128
        # Now confirm the subscription.
 
129
        self._registrar.confirm(token)
 
130
        member = self._mlist.regular_members.get_member('anne@example.com')
 
131
        self.assertEqual(member.address, self._anne)
 
132
 
 
133
    def test_confirm_because_confirm_then_moderation(self):
 
134
        # We have a subscription request which requires the user to confirm
 
135
        # (because she does not have a verified address) and the moderator to
 
136
        # approve.  Running the workflow gives us a token.  Confirming the
 
137
        # token runs the workflow a little farther, but still gives us a
 
138
        # token.  Confirming again subscribes the user.
 
139
        self._mlist.subscription_policy = \
 
140
          SubscriptionPolicy.confirm_then_moderate
 
141
        self._anne.verified_on = now()
 
142
        # Runs until subscription confirmation.
 
143
        token = self._registrar.register(self._anne)
 
144
        self.assertIsNotNone(token)
 
145
        member = self._mlist.regular_members.get_member('anne@example.com')
 
146
        self.assertIsNone(member)
 
147
        # Now confirm the subscription, and wait for the moderator to approve
 
148
        # the subscription.  She is still not subscribed.
 
149
        new_token = self._registrar.confirm(token)
 
150
        # The new token, used for the moderator to approve the message, is not
 
151
        # the same as the old token.
 
152
        self.assertNotEqual(new_token, token)
 
153
        member = self._mlist.regular_members.get_member('anne@example.com')
 
154
        self.assertIsNone(member)
 
155
        # Confirm once more, this time as the moderator approving the
 
156
        # subscription.  Now she's a member.
 
157
        self._registrar.confirm(new_token)
 
158
        member = self._mlist.regular_members.get_member('anne@example.com')
 
159
        self.assertEqual(member.address, self._anne)
 
160
 
 
161
    def test_confirm_then_moderate_with_different_tokens(self):
 
162
        # Ensure that the confirmation token the user sees when they have to
 
163
        # confirm their subscription is different than the token the moderator
 
164
        # sees when they approve the subscription.  This prevents the user
 
165
        # from using a replay attack to subvert moderator approval.
 
166
        self._mlist.subscription_policy = \
 
167
          SubscriptionPolicy.confirm_then_moderate
 
168
        self._anne.verified_on = now()
 
169
        # Runs until subscription confirmation.
 
170
        token = self._registrar.register(self._anne)
 
171
        self.assertIsNotNone(token)
 
172
        member = self._mlist.regular_members.get_member('anne@example.com')
 
173
        self.assertIsNone(member)
 
174
        # Now confirm the subscription, and wait for the moderator to approve
 
175
        # the subscription.  She is still not subscribed.
 
176
        new_token = self._registrar.confirm(token)
 
177
        # The status is not true because the user has not yet been subscribed
 
178
        # to the mailing list.
 
179
        self.assertIsNotNone(new_token)
 
180
        member = self._mlist.regular_members.get_member('anne@example.com')
 
181
        self.assertIsNone(member)
 
182
        # The new token is different than the old token.
 
183
        self.assertNotEqual(token, new_token)
 
184
        # Trying to confirm with the old token does not work.
 
185
        self.assertRaises(LookupError, self._registrar.confirm, token)
 
186
        # Confirm once more, this time with the new token, as the moderator
 
187
        # approving the subscription.  Now she's a member.
 
188
        done_token = self._registrar.confirm(new_token)
 
189
        # The token is None, signifying that the member has been subscribed.
 
190
        self.assertIsNone(done_token)
 
191
        member = self._mlist.regular_members.get_member('anne@example.com')
 
192
        self.assertEqual(member.address, self._anne)
 
193
 
 
194
    def test_discard_waiting_for_confirmation(self):
 
195
        # While waiting for a user to confirm their subscription, we discard
 
196
        # the workflow.
 
197
        self._mlist.subscription_policy = SubscriptionPolicy.confirm
 
198
        self._anne.verified_on = now()
 
199
        # Runs until subscription confirmation.
 
200
        token = self._registrar.register(self._anne)
 
201
        self.assertIsNotNone(token)
 
202
        member = self._mlist.regular_members.get_member('anne@example.com')
 
203
        self.assertIsNone(member)
 
204
        # Now discard the subscription request.
 
205
        self._registrar.discard(token)
 
206
        # Trying to confirm the token now results in an exception.
 
207
        self.assertRaises(LookupError, self._registrar.confirm, token)