~elopio/ubuntu-sso-client/url_vars_tests

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
# -*- coding: utf-8 -*-
#
# Author: Natalia Bidart <natalia.bidart@canonical.com>
# Author: Alejandro J. Cura <alecu@canonical.com>
#
# Copyright 2010-2012 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# In addition, as a special exception, the copyright holders give
# permission to link the code of portions of this program with the
# OpenSSL library under certain conditions as described in each
# individual source file, and distribute linked combinations
# including the two.
# You must obey the GNU General Public License in all respects
# for all of the code used other than OpenSSL.  If you modify
# file(s) with this exception, you may extend this exception to your
# version of the file(s), but you are not obligated to do so.  If you
# do not wish to do so, delete this exception statement from your
# version.  If you delete this exception statement from all source
# files in the program, then also delete it here.
"""Tests for the SSO account code."""

from __future__ import unicode_literals

import copy
import os

# pylint: disable=W0621,F0401,E0611
try:
    import urllib.request as url_lib
except ImportError:
    import urllib2 as url_lib
# pylint: enable=W0621,F0401,E0611

from twisted.trial.unittest import TestCase
from twisted.internet import defer

from ubuntu_sso import account
from ubuntu_sso.account import (
    Account,
    AuthenticationError,
    EmailTokenError,
    InvalidEmailError,
    InvalidPasswordError,
    NewPasswordError,
    RegistrationError,
    ResetPasswordTokenError,
    SSO_STATUS_OK,
    SSO_STATUS_ERROR,
)
from ubuntu_sso.tests import (
    APP_NAME,
    CAPTCHA_ID,
    CAPTCHA_PATH,
    CAPTCHA_SOLUTION,
    EMAIL,
    EMAIL_TOKEN,
    NAME,
    PASSWORD,
    RESET_PASSWORD_TOKEN,
    TOKEN,
    TOKEN_NAME,
)
from ubuntu_sso.utils import compat


CANT_RESET_PASSWORD_CONTENT = "CanNotResetPassowrdError: " \
                              "Can't reset password for this account"
RESET_TOKEN_INVALID_CONTENT = "AuthToken matching query does not exist."
EMAIL_ALREADY_REGISTERED = 'a@example.com'
STATUS_UNKNOWN = {'status': 'yadda-yadda'}
STATUS_ERROR = {'status': SSO_STATUS_ERROR,
                'errors': {'something': ['Bla', 'Ble']}}
STATUS_OK = {'status': SSO_STATUS_OK}
STATUS_EMAIL_UNKNOWN = {'status': 'yadda-yadda'}
STATUS_EMAIL_ERROR = {'errors': {'email_token': ['Error1', 'Error2']}}
STATUS_EMAIL_OK = {'email': EMAIL}

FAKE_NEW_CAPTCHA = {
    'image_url': "file://" + compat.text_type(CAPTCHA_PATH),
    'captcha_id': CAPTCHA_ID,
}


class FakeWebClientResponse(object):
    """A fake webclient.Response."""

    content = open(CAPTCHA_PATH, "rb").read()


class FakeWebClient(object):
    """A fake webclient."""

    def __init__(self):
        self.started = True

    def request(self, url):
        """Do a fake request, return a fake Response."""
        return FakeWebClientResponse()

    def shutdown(self):
        """Turn off this webclient."""
        self.started = False


class FakeRestfulClient(object):
    """A fake restfulclient."""

    preferred_email = EMAIL

    def __init__(self):
        self.started = True

    def shutdown(self):
        """Stop this restfulclient."""
        self.started = False

    def fake_captchas_new(self):
        """Return a local fake captcha."""
        return FAKE_NEW_CAPTCHA

    def fake_registration_register(self, email, password, displayname,
                                    captcha_id, captcha_solution):
        """Fake registration. Return a fix result."""
        if email == EMAIL_ALREADY_REGISTERED:
            return {'status': SSO_STATUS_ERROR,
                    'errors': {'email': 'Email already registered'}}
        elif captcha_id is None and captcha_solution is None:
            return STATUS_UNKNOWN
        elif captcha_id != CAPTCHA_ID or captcha_solution != CAPTCHA_SOLUTION:
            return STATUS_ERROR
        else:
            return STATUS_OK

    def fake_registration_request_password_reset_token(self, email):
        """Fake password reset token. Return a fix result."""
        if email is None:
            return STATUS_UNKNOWN
        elif email != EMAIL:
            raise account.WebClientError("Misc error",
                                         CANT_RESET_PASSWORD_CONTENT)
        else:
            return STATUS_OK

    def fake_registration_set_new_password(self, email, token, new_password):
        """Fake the setting of new password. Return a fix result."""
        if email is None and token is None and new_password is None:
            return STATUS_UNKNOWN
        elif email != EMAIL or token != RESET_PASSWORD_TOKEN:
            raise account.WebClientError("Misc error",
                                         RESET_TOKEN_INVALID_CONTENT)
        else:
            return STATUS_OK

    def fake_authentications_authenticate(self, token_name):
        """Fake authenticate. Return a fix result."""
        if not token_name.startswith(TOKEN_NAME):
            raise account.WebClientError()
        else:
            return TOKEN

    def fake_accounts_validate_email(self, email_token):
        """Fake the email validation. Return a fix result."""
        if email_token is None:
            return STATUS_EMAIL_UNKNOWN
        elif email_token == EMAIL_ALREADY_REGISTERED:
            return {
                'status': SSO_STATUS_ERROR,
                'errors': {'email': 'Email already registered'}
            }
        elif email_token != EMAIL_TOKEN:
            return STATUS_EMAIL_ERROR
        else:
            return STATUS_EMAIL_OK

    def fake_accounts_me(self):
        """Fake the 'me' information."""
        return {'username': 'Wh46bKY',
                'preferred_email': self.preferred_email,
                'displayname': '',
                'unverified_emails': ['aaaaaa@example.com'],
                'verified_emails': [],
                'openid_identifier': 'Wh46bKY'}

    def check_all_kwargs_unicode(self, **kwargs):
        """Check that the values of all keyword arguments are unicode."""
        for (key, value) in kwargs.items():
            if isinstance(value, compat.binary_type):
                raise AssertionError("Error: kwarg '%s' is non-unicode." % key)

    def restcall(self, method_name, **kwargs):
        """Fake an async restcall."""
        self.check_all_kwargs_unicode(**kwargs)
        method = getattr(self, "fake_" + method_name.replace(".", "_"))
        try:
            return defer.succeed(method(**kwargs))
        # pylint: disable=W0703
        except Exception as e:
            return defer.fail(e)


class AccountTestCase(TestCase):
    """Test suite for the SSO login processor."""

    @defer.inlineCallbacks
    def setUp(self):
        """Set up."""
        yield super(AccountTestCase, self).setUp()

        def fake_urlopen(url):
            """Fake an urlopen which will read from the disk."""
            f = open(url)
            self.addCleanup(f.close)
            return f

        self.patch(url_lib, 'urlopen', fake_urlopen)  # fd to the path
        self.processor = Account()
        self.register_kwargs = dict(email=EMAIL, password=PASSWORD,
                                    displayname=NAME,
                                    captcha_id=CAPTCHA_ID,
                                    captcha_solution=CAPTCHA_SOLUTION)
        self.login_kwargs = dict(email=EMAIL, password=PASSWORD,
                                 token_name=TOKEN_NAME)
        self.frc = FakeRestfulClient()
        self.patch(account.restful, "RestfulClient",
                   lambda *args, **kwargs: self.frc)
        self.addCleanup(self.verify_frc_shutdown)

    def verify_frc_shutdown(self):
        """Verify that the FakeRestfulClient was stopped."""
        assert self.frc.started is False, "Restfulclient must be shut down."

    @defer.inlineCallbacks
    def test_generate_captcha(self):
        """Captcha can be generated."""
        filename = self.mktemp()
        self.addCleanup(lambda: os.remove(filename)
                                if os.path.exists(filename) else None)
        wc = FakeWebClient()
        self.patch(account.webclient, "webclient_factory", lambda: wc)
        captcha_id = yield self.processor.generate_captcha(filename)
        self.assertEqual(CAPTCHA_ID, captcha_id, 'captcha id must be correct.')
        self.assertTrue(os.path.isfile(filename), '%s must exist.' % filename)

        with open(CAPTCHA_PATH) as f:
            expected = f.read()
        with open(filename) as f:
            actual = f.read()
        self.assertEqual(expected, actual, 'captcha image must be correct.')
        self.assertFalse(wc.started, "Webclient must be shut down.")

    @defer.inlineCallbacks
    def test_register_user_checks_valid_email(self):
        """Email is validated."""
        self.register_kwargs['email'] = 'notavalidemail'
        d = self.processor.register_user(**self.register_kwargs)
        yield self.assertFailure(d, InvalidEmailError)

    @defer.inlineCallbacks
    def test_register_user_checks_valid_password(self):
        """Password is validated."""
        self.register_kwargs['password'] = ''
        d = self.processor.register_user(**self.register_kwargs)
        yield self.assertFailure(d, InvalidPasswordError)

        # 7 chars, one less than expected
        self.register_kwargs['password'] = 'tesT3it'
        d = self.processor.register_user(**self.register_kwargs)
        yield self.assertFailure(d, InvalidPasswordError)

        self.register_kwargs['password'] = 'test3it!'  # no upper case
        d = self.processor.register_user(**self.register_kwargs)
        yield self.assertFailure(d, InvalidPasswordError)

        self.register_kwargs['password'] = 'testIt!!'  # no number
        d = self.processor.register_user(**self.register_kwargs)
        yield self.assertFailure(d, InvalidPasswordError)

    # register

    @defer.inlineCallbacks
    def test_register_user_if_status_ok(self):
        """A user is succesfuy registered into the SSO server."""
        result = yield self.processor.register_user(**self.register_kwargs)
        self.assertEqual(EMAIL, result, 'registration was successful.')

    @defer.inlineCallbacks
    def test_register_user_if_status_error(self):
        """Proper error is raised if register fails."""
        self.register_kwargs['captcha_id'] = CAPTCHA_ID * 2  # incorrect
        d = self.processor.register_user(**self.register_kwargs)
        failure = yield self.assertFailure(d, RegistrationError)
        for k, val in failure.args[0].items():
            self.assertIn(k, STATUS_ERROR['errors'])
            self.assertEqual(val, "\n".join(STATUS_ERROR['errors'][k]))

    @defer.inlineCallbacks
    def test_register_user_if_status_error_with_string_message(self):
        """Proper error is raised if register fails."""
        self.register_kwargs['email'] = EMAIL_ALREADY_REGISTERED
        d = self.processor.register_user(**self.register_kwargs)
        failure = yield self.assertFailure(d, RegistrationError)
        for k, val in failure.args[0].items():
            self.assertIn(k, {'email': 'Email already registered'})
            self.assertEqual(val, 'Email already registered')

    @defer.inlineCallbacks
    def test_register_user_if_status_unknown(self):
        """Proper error is raised if register returns an unknown status."""
        self.register_kwargs['captcha_id'] = None
        self.register_kwargs['captcha_solution'] = None
        d = self.processor.register_user(**self.register_kwargs)
        failure = yield self.assertFailure(d, RegistrationError)
        self.assertIn('Received unknown status: %s' % STATUS_UNKNOWN, failure)

    # login

    @defer.inlineCallbacks
    def test_login_if_http_error(self):
        """Proper error is raised if authentication fails."""
        # use an invalid token name
        self.login_kwargs['token_name'] = APP_NAME * 2
        d = self.processor.login(**self.login_kwargs)
        yield self.assertFailure(d, AuthenticationError)

    @defer.inlineCallbacks
    def test_login_if_no_error(self):
        """A user can be succesfully logged in into the SSO service."""
        result = yield self.processor.login(**self.login_kwargs)
        self.assertEqual(TOKEN, result, 'authentication was successful.')

    # is_validated

    @defer.inlineCallbacks
    def test_is_validated(self):
        """If preferred email is not None, user is validated."""
        result = yield self.processor.is_validated(token=TOKEN)
        self.assertTrue(result, 'user must be validated.')

    @defer.inlineCallbacks
    def test_is_not_validated(self):
        """If preferred email is None, user is not validated."""
        self.frc.preferred_email = None
        result = yield self.processor.is_validated(token=TOKEN)
        self.assertFalse(result, 'user must not be validated.')

    @defer.inlineCallbacks
    def test_is_not_validated_empty_result(self):
        """If preferred email is None, user is not validated."""
        self.patch(self.frc, "fake_accounts_me", lambda *args: {})
        result = yield self.processor.is_validated(token=TOKEN)
        self.assertFalse(result, 'user must not be validated.')

    # validate_email

    @defer.inlineCallbacks
    def test_validate_email_if_status_ok(self):
        """A email is succesfuy validated in the SSO server."""
        self.login_kwargs['email_token'] = EMAIL_TOKEN  # valid email token
        result = yield self.processor.validate_email(**self.login_kwargs)
        self.assertEqual(TOKEN, result, 'email validation was successful.')

    @defer.inlineCallbacks
    def test_validate_email_if_status_error(self):
        """Proper error is raised if email validation fails."""
        self.login_kwargs['email_token'] = EMAIL_TOKEN * 2  # invalid token
        d = self.processor.validate_email(**self.login_kwargs)
        failure = yield self.assertFailure(d, EmailTokenError)
        for k, val in failure.args[0].items():
            self.assertIn(k, STATUS_EMAIL_ERROR['errors'])
            self.assertEqual(val, "\n".join(STATUS_EMAIL_ERROR['errors'][k]))

    @defer.inlineCallbacks
    def test_validate_email_if_status_error_with_string_message(self):
        """Proper error is raised if register fails."""
        self.login_kwargs['email_token'] = EMAIL_ALREADY_REGISTERED
        d = self.processor.validate_email(**self.login_kwargs)
        failure = yield self.assertFailure(d, EmailTokenError)
        for k, val in failure.args[0].items():
            self.assertIn(k, {'email': 'Email already registered'})
            self.assertEqual(val, 'Email already registered')

    @defer.inlineCallbacks
    def test_validate_email_if_status_unknown(self):
        """Proper error is raised if email validation returns unknown."""
        self.login_kwargs['email_token'] = None
        d = self.processor.validate_email(**self.login_kwargs)
        failure = yield self.assertFailure(d, EmailTokenError)
        self.assertIn('Received invalid reply: %s' % STATUS_UNKNOWN, failure)

    # reset_password

    @defer.inlineCallbacks
    def test_request_password_reset_token_if_status_ok(self):
        """A reset password token is succesfuly sent."""
        result = yield self.processor.request_password_reset_token(email=EMAIL)
        self.assertEqual(EMAIL, result,
                         'password reset token must be successful.')

    @defer.inlineCallbacks
    def test_request_password_reset_token_if_http_error(self):
        """Proper error is raised if password token request fails."""
        d = self.processor.request_password_reset_token(email=EMAIL * 2)
        exc = yield self.assertFailure(d, ResetPasswordTokenError)
        self.assertIn(CANT_RESET_PASSWORD_CONTENT, exc)

    @defer.inlineCallbacks
    def test_request_password_reset_token_if_status_unknown(self):
        """Proper error is raised if password token request returns unknown."""
        d = self.processor.request_password_reset_token(email=None)
        exc = yield self.assertFailure(d, ResetPasswordTokenError)
        self.assertIn('Received invalid reply: %s' % STATUS_UNKNOWN, exc)

    @defer.inlineCallbacks
    def test_set_new_password_if_status_ok(self):
        """A new password is succesfuy set."""
        result = yield self.processor.set_new_password(email=EMAIL,
                                                 token=RESET_PASSWORD_TOKEN,
                                                 new_password=PASSWORD)
        self.assertEqual(EMAIL, result,
                         'new password must be set successfully.')

    @defer.inlineCallbacks
    def test_set_new_password_if_http_error(self):
        """Proper error is raised if setting a new password fails."""
        d = self.processor.set_new_password(email=EMAIL * 2,
                                            token=RESET_PASSWORD_TOKEN * 2,
                                            new_password=PASSWORD)
        exc = yield self.assertFailure(d, NewPasswordError)
        self.assertIn(RESET_TOKEN_INVALID_CONTENT, exc)

    @defer.inlineCallbacks
    def test_set_new_password_if_status_unknown(self):
        """Proper error is raised if setting a new password returns unknown."""
        d = self.processor.set_new_password(email=None, token=None,
                                            new_password=None)
        exc = yield self.assertFailure(d, NewPasswordError)
        self.assertIn('Received invalid reply: %s' % STATUS_UNKNOWN, exc)


class EnvironOverridesTestCase(TestCase):
    """Some URLs can be set from the environment for testing/QA purposes."""

    def test_override_service_url(self):
        """The service url can be set from the env var USSOC_SERVICE_URL."""
        test_environ = copy.copy(os.environ)
        fake_url = 'this is not really a URL, but ends with slash: /'
        test_environ['USSOC_SERVICE_URL'] = fake_url
        self.patch(os, 'environ', test_environ)
        proc = Account()
        self.assertEqual(proc.service_url, fake_url)

    def test_default_service_url(self):
        """If the environ is unset, the default service url is used."""
        self.addCleanup(reload, account)
        test_environ = copy.copy(os.environ)
        for variable in ('SSO_AUTH_BASE_URL', 'USSOC_SERVICE_URL'):
            if variable in test_environ:
                del test_environ[variable]
        self.patch(os, 'environ', test_environ)
        reload(account)
        proc = account.Account()
        self.assertEqual(proc.service_url, 'https://login.ubuntu.com/api/1.0/')

    def test_service_url_as_parameter(self):
        """If the parameter service url is given, is used."""
        expected = 'http://foo/bar/baz/'
        proc = Account(service_url=expected)
        self.assertEqual(proc.service_url, expected)

    def test_service_url_from_sso_auth_var(self):
        """If the env var SSO_AUTH_BASE_URL is set, it's used as base URL."""
        self.addCleanup(reload, account)
        test_environ = copy.copy(os.environ)
        test_environ['SSO_AUTH_BASE_URL'] = 'http://login.test.url'
        if 'USSOC_SERVICE_URL' in test_environ:
            del test_environ['USSOC_SERVICE_URL']
        self.patch(os, 'environ', test_environ)
        reload(account)
        test_account = account.Account()
        self.assertEqual(
            test_account.service_url, 'http://login.test.url/api/1.0/')