~roadmr/canonical-identity-provider/fix-deprecation-warnings-1

« back to all changes in this revision

Viewing changes to src/identityprovider/tests/test_utils.py

[r=michael.nelson] Add new webservices app, refactor code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
# Copyright 2010 Canonical Ltd.  This software is licensed under the
2
2
# GNU Affero General Public License version 3 (see the file LICENSE).
3
 
import json
4
3
import os
5
4
import random
6
 
import re
7
 
import urllib2
8
 
import urlparse
9
5
from datetime import date, datetime, time, timedelta
10
6
from uuid import uuid4
11
7
 
18
14
from identityprovider import utils
19
15
from identityprovider.models.person import Person
20
16
from identityprovider.utils import (
21
 
    NameAlreadyTaken,
22
 
    NoSuchAccount,
23
 
    NotPlaceholderAccount,
24
 
    SSHKeyAdditionDataError,
25
 
    SSHKeyAdditionTypeError,
26
 
    SSHKeyCompromisedError,
27
 
    add_lp_ssh_key,
28
17
    canonical_url,
29
18
    clean_sessions_until,
30
 
    delete_lp_ssh_key,
31
19
    generate_random_string,
32
 
    get_lp_ssh_keys,
33
 
    get_lp_username,
34
 
    http_request_with_timeout,
35
20
    polite_form_errors,
36
21
    redirection_url_for_token,
37
 
    set_lp_username,
38
22
    validate_launchpad_password,
39
23
)
40
 
from identityprovider.tests.utils import (
41
 
    MockHandler,
42
 
    SSOBaseTestCase,
43
 
)
 
24
from identityprovider.tests.utils import SSOBaseTestCase
44
25
from identityprovider.utils import get_provider_url
45
26
 
46
27
 
75
56
        self.assertFalse(validate_launchpad_password('a', 'b'))
76
57
 
77
58
 
78
 
class HttpRequestWithTimeoutTestCase(SSOBaseTestCase):
79
 
 
80
 
    def setUp(self):
81
 
        super(HttpRequestWithTimeoutTestCase, self).setUp()
82
 
        self.exception = None
83
 
        self.output = None
84
 
        self.headers = None
85
 
 
86
 
        class MockDatafile(object):
87
 
 
88
 
            def __init__(self, output, headers):
89
 
                self.output = output
90
 
                self.headers = headers
91
 
 
92
 
            def read(self):
93
 
                return self.output
94
 
 
95
 
            def info(self):
96
 
                return self.headers
97
 
 
98
 
        def mock_urlopen(request):
99
 
            self.request = request
100
 
            if self.exception:
101
 
                raise urllib2.URLError("arg")
102
 
            return MockDatafile(self.output, self.headers)
103
 
 
104
 
        self.patch(urllib2, 'urlopen', new=mock_urlopen)
105
 
 
106
 
    def test_without_data_and_successful_outcome(self):
107
 
        self.output = "output"
108
 
        response = http_request_with_timeout("http://example.com")
109
 
        self.assertEqual("output", response.read())
110
 
 
111
 
    def test_without_data_and_failing(self):
112
 
        self.exception = True
113
 
        self.assertRaises(
114
 
            urllib2.URLError,
115
 
            http_request_with_timeout, "http://example.com")
116
 
 
117
 
    def test_with_data_and_successful_outcome(self):
118
 
        self.output = "output"
119
 
        http_request_with_timeout("http://example.com",
120
 
                                  {"i1": "v1", "i2": "v2"})
121
 
        self.assertTrue("&" in self.request.data)
122
 
 
123
 
 
124
59
class RedirectionURLForTokenTestCase(SSOBaseTestCase):
125
60
 
126
61
    def test_if_token_is_none(self):
297
232
        charset = '!@#$%^&*()'
298
233
        string1 = generate_random_string(200, charset=charset)
299
234
        self.assertTrue(all([ch in charset for ch in string1]))
300
 
 
301
 
 
302
 
@override_settings(
303
 
    LP_API_URL='http://api.launchpad.net/test',
304
 
    LP_API_CONSUMER_KEY='SSO', LP_API_TOKEN='sso-token',
305
 
    LP_API_TOKEN_SECRET='sso-secret')
306
 
class LPUsernameAPITestCase(SSOBaseTestCase):
307
 
 
308
 
    def setUp(self):
309
 
        super(LPUsernameAPITestCase, self).setUp()
310
 
        self.lp_api_handler = MockHandler()
311
 
        self.mock_lp_api_opener = self.patch(
312
 
            'identityprovider.utils._lp_api_opener',
313
 
            return_value=urllib2.build_opener(self.lp_api_handler))
314
 
 
315
 
    def test_set_lp_username(self):
316
 
        self.lp_api_handler.set_next_response(200, '')
317
 
        self.assertIsNone(set_lp_username('foo', 'bar', dry_run=True))
318
 
 
319
 
        [request] = self.lp_api_handler.requests
320
 
        self.assertEqual('POST', request.get_method())
321
 
        self.assertEqual(['Authorization'], request.headers.keys())
322
 
        self.assertEqual(
323
 
            'OAuth oauth_nonce="...", oauth_timestamp="...", '
324
 
            'oauth_version="1.0", oauth_signature_method="PLAINTEXT", '
325
 
            'oauth_consumer_key="SSO", oauth_token="sso-token", '
326
 
            'oauth_signature="%26sso-secret"',
327
 
            re.sub(r'"\d+"', '"..."', request.headers['Authorization']))
328
 
        self.assertEqual(
329
 
            'http://api.launchpad.net/test/people', request.get_full_url())
330
 
        self.assertEqual(
331
 
            {'ws.op': ['setUsernameFromSSO'], 'openid_identifier': ['foo'],
332
 
             'name': ['bar'], 'dry_run': ['true']},
333
 
            urlparse.parse_qs(request.data))
334
 
 
335
 
    def test_set_lp_username_taken(self):
336
 
        self.lp_api_handler.set_next_response(
337
 
            400,
338
 
            'name: taken-name is already in use by another person or team.')
339
 
        self.assertRaises(
340
 
            NameAlreadyTaken, set_lp_username, 'foo', 'taken-name')
341
 
 
342
 
    def test_set_lp_username_blacklisted(self):
343
 
        self.lp_api_handler.set_next_response(
344
 
            400,
345
 
            'name: The name 'private-name' has been blocked by the '
346
 
            'Launchpad administrators. Contact Launchpad Support if you want '
347
 
            'to use this name.')
348
 
        self.assertRaises(
349
 
            NameAlreadyTaken, set_lp_username, 'foo', 'private-name')
350
 
 
351
 
    def test_set_lp_username_existing(self):
352
 
        self.lp_api_handler.set_next_response(
353
 
            400,
354
 
            'An account for that OpenID identifier already exists.')
355
 
        self.assertRaises(
356
 
            NotPlaceholderAccount, set_lp_username, 'taken-openid',
357
 
            'bar')
358
 
 
359
 
    def test_get_lp_username(self):
360
 
        self.lp_api_handler.set_next_response(200, json.dumps("bar"))
361
 
        self.assertEqual("bar", get_lp_username('foo'))
362
 
 
363
 
        [request] = self.lp_api_handler.requests
364
 
        self.assertEqual('GET', request.get_method())
365
 
        self.assertEqual(['Authorization'], request.headers.keys())
366
 
        self.assertEqual(
367
 
            'OAuth oauth_nonce="...", oauth_timestamp="...", '
368
 
            'oauth_version="1.0", oauth_signature_method="PLAINTEXT", '
369
 
            'oauth_consumer_key="SSO", oauth_token="sso-token", '
370
 
            'oauth_signature="%26sso-secret"',
371
 
            re.sub(r'"\d+"', '"..."', request.headers['Authorization']))
372
 
        self.assertEqual(
373
 
            'http://api.launchpad.net/test/people?'
374
 
            'ws.op=getUsernameForSSO&openid_identifier=foo',
375
 
            request.get_full_url())
376
 
 
377
 
 
378
 
@override_settings(
379
 
    LP_API_URL='http://api.launchpad.net/test',
380
 
    LP_API_CONSUMER_KEY='SSO', LP_API_TOKEN='sso-token',
381
 
    LP_API_TOKEN_SECRET='sso-secret')
382
 
class LPSSHKeyAPITestCase(SSOBaseTestCase):
383
 
 
384
 
    def setUp(self):
385
 
        super(LPSSHKeyAPITestCase, self).setUp()
386
 
        self.lp_api_handler = MockHandler()
387
 
        self.mock_lp_api_opener = self.patch(
388
 
            'identityprovider.utils._lp_api_opener',
389
 
            return_value=urllib2.build_opener(self.lp_api_handler))
390
 
 
391
 
    def assert_authorization_headers(self, request):
392
 
        self.assertEqual(['Authorization'], request.headers.keys())
393
 
        self.assertEqual(
394
 
            'OAuth oauth_nonce="...", oauth_timestamp="...", '
395
 
            'oauth_version="1.0", oauth_signature_method="PLAINTEXT", '
396
 
            'oauth_consumer_key="SSO", oauth_token="sso-token", '
397
 
            'oauth_signature="%26sso-secret"',
398
 
            re.sub(r'"\d+"', '"..."', request.headers['Authorization']))
399
 
 
400
 
    def test_get_ssh_keys_nonexistant(self):
401
 
        self.lp_api_handler.set_next_response(200, 'null')
402
 
        self.assertIsNone(get_lp_ssh_keys('openid'))
403
 
 
404
 
        self.assertEqual(1, len(self.lp_api_handler.requests))
405
 
        request = self.lp_api_handler.requests[0]
406
 
        self.assertEqual('GET', request.get_method())
407
 
        self.assert_authorization_headers(request)
408
 
 
409
 
        self.assertEqual(
410
 
            'http://api.launchpad.net/test/people?'
411
 
            'ws.op=getSSHKeysForSSO&openid_identifier=openid',
412
 
            request.get_full_url())
413
 
 
414
 
    def test_get_ssh_keys_no_keys(self):
415
 
        self.lp_api_handler.set_next_response(200, '[]')
416
 
        self.assertEqual([], get_lp_ssh_keys('openid'))
417
 
 
418
 
    def test_add_ssh_key_with_nonexistant_user(self):
419
 
        self.lp_api_handler.set_next_response(
420
 
            400, "No account found for openid identifier 'openid'")
421
 
        self.assertRaises(
422
 
            NoSuchAccount,
423
 
            add_lp_ssh_key,
424
 
            'openid', 'ssh-rsa keytext comment', False
425
 
        )
426
 
        [request] = self.lp_api_handler.requests
427
 
        self.assertEqual('POST', request.get_method())
428
 
        self.assert_authorization_headers(request)
429
 
 
430
 
    def test_add_ssh_key_with_bad_key_type(self):
431
 
        self.lp_api_handler.set_next_response(
432
 
            400, "Invalid SSH key type: 'flub'")
433
 
        self.assertRaises(
434
 
            SSHKeyAdditionTypeError,
435
 
            add_lp_ssh_key,
436
 
            'openid', 'flub keytext comment', False
437
 
        )
438
 
        [request] = self.lp_api_handler.requests
439
 
        self.assertEqual('POST', request.get_method())
440
 
        self.assert_authorization_headers(request)
441
 
 
442
 
    def test_add_ssh_key_with_bad_key_data(self):
443
 
        self.lp_api_handler.set_next_response(
444
 
            400, "Invalid SSH key data: 'keywithnospaces'")
445
 
        self.assertRaises(
446
 
            SSHKeyAdditionDataError,
447
 
            add_lp_ssh_key,
448
 
            'openid', 'keywithnospaces', False
449
 
        )
450
 
        [request] = self.lp_api_handler.requests
451
 
        self.assertEqual('POST', request.get_method())
452
 
        self.assert_authorization_headers(request)
453
 
 
454
 
    def test_add_ssh_key_with_compromised_key(self):
455
 
        self.lp_api_handler.set_next_response(
456
 
            400,
457
 
            "This key cannot be added as it is known to be compromised.")
458
 
        self.assertRaises(
459
 
            SSHKeyCompromisedError,
460
 
            add_lp_ssh_key,
461
 
            'openid', 'compromised_key', False
462
 
        )
463
 
        [request] = self.lp_api_handler.requests
464
 
        self.assertEqual('POST', request.get_method())
465
 
        self.assert_authorization_headers(request)
466
 
 
467
 
    def test_add_ssh_key_works(self):
468
 
        self.lp_api_handler.set_next_response(200, "")
469
 
        self.assertIsNone(add_lp_ssh_key('openid', 'key_data', False))
470
 
        [request] = self.lp_api_handler.requests
471
 
        self.assertEqual('POST', request.get_method())
472
 
        self.assert_authorization_headers(request)
473
 
 
474
 
    def test_delete_ssh_key_with_nonexistant_user(self):
475
 
        self.lp_api_handler.set_next_response(
476
 
            400, "No account found for openid identifier 'openid'")
477
 
        self.assertRaises(
478
 
            NoSuchAccount,
479
 
            delete_lp_ssh_key,
480
 
            'openid', 'ssh-rsa keytext comment', False
481
 
        )
482
 
        [request] = self.lp_api_handler.requests
483
 
        self.assertEqual('POST', request.get_method())
484
 
        self.assert_authorization_headers(request)
485
 
 
486
 
    def test_delete_ssh_key_with_bad_key_type(self):
487
 
        self.lp_api_handler.set_next_response(
488
 
            400, "Invalid SSH key type: 'flub'")
489
 
        self.assertRaises(
490
 
            SSHKeyAdditionTypeError,
491
 
            delete_lp_ssh_key,
492
 
            'openid', 'flub keytext comment', False
493
 
        )
494
 
        [request] = self.lp_api_handler.requests
495
 
        self.assertEqual('POST', request.get_method())
496
 
        self.assert_authorization_headers(request)
497
 
 
498
 
    def test_delete_ssh_key_with_bad_key_data(self):
499
 
        self.lp_api_handler.set_next_response(
500
 
            400, "Invalid SSH key data: 'keywithnospaces'")
501
 
        self.assertRaises(
502
 
            SSHKeyAdditionDataError,
503
 
            delete_lp_ssh_key,
504
 
            'openid', 'keywithnospaces', False
505
 
        )
506
 
        [request] = self.lp_api_handler.requests
507
 
        self.assertEqual('POST', request.get_method())
508
 
        self.assert_authorization_headers(request)
509
 
 
510
 
    def test_delete_ssh_key_works(self):
511
 
        self.lp_api_handler.set_next_response(200, "")
512
 
        self.assertIsNone(delete_lp_ssh_key('openid', 'key_data', False))
513
 
        [request] = self.lp_api_handler.requests
514
 
        self.assertEqual('POST', request.get_method())
515
 
        self.assert_authorization_headers(request)