~ubuntu-branches/debian/sid/social-auth-core/sid

« back to all changes in this revision

Viewing changes to social_core/tests/backends/open_id_connect.py

  • Committer: Package Import Robot
  • Author(s): Andre Bianchi
  • Date: 2018-02-22 19:49:12 UTC
  • Revision ID: package-import@ubuntu.com-20180222194912-4lqv8mlhnqc4ncd3
Tags: upstream-1.7.0
ImportĀ upstreamĀ versionĀ 1.7.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
from calendar import timegm
 
3
 
 
4
import os
 
5
import sys
 
6
import json
 
7
import datetime
 
8
import unittest2
 
9
 
 
10
try:
 
11
    from jwkest.jwk import RSAKey, KEYS
 
12
    from jwkest.jws import JWS
 
13
    from jwkest.jwt import b64encode_item
 
14
    NO_JWKEST = False
 
15
except ImportError:
 
16
    NO_JWKEST = True
 
17
 
 
18
from httpretty import HTTPretty
 
19
 
 
20
sys.path.insert(0, '..')
 
21
 
 
22
from ...exceptions import AuthTokenError
 
23
 
 
24
 
 
25
class OpenIdConnectTestMixin(object):
 
26
    """
 
27
    Mixin to test OpenID Connect consumers. Inheriting classes should also
 
28
    inherit OAuth2Test.
 
29
    """
 
30
    client_key = 'a-key'
 
31
    client_secret = 'a-secret-key'
 
32
    issuer = None  # id_token issuer
 
33
    openid_config_body = None
 
34
    key = None
 
35
 
 
36
    def setUp(self):
 
37
        super(OpenIdConnectTestMixin, self).setUp()
 
38
        test_root = os.path.dirname(os.path.dirname(__file__))
 
39
        self.key = RSAKey(kid='testkey').load(os.path.join(test_root, 'testkey.pem'))
 
40
        HTTPretty.register_uri(HTTPretty.GET,
 
41
          self.backend.OIDC_ENDPOINT + '/.well-known/openid-configuration',
 
42
          status=200,
 
43
          body=self.openid_config_body
 
44
        )
 
45
        oidc_config = json.loads(self.openid_config_body)
 
46
 
 
47
        def jwks(_request, _uri, headers):
 
48
            ks = KEYS()
 
49
            ks.add(self.key.serialize())
 
50
            return 200, headers, ks.dump_jwks()
 
51
 
 
52
        HTTPretty.register_uri(HTTPretty.GET,
 
53
                               oidc_config.get('jwks_uri'),
 
54
                               status=200,
 
55
                               body=jwks)
 
56
 
 
57
    def extra_settings(self):
 
58
        settings = super(OpenIdConnectTestMixin, self).extra_settings()
 
59
        settings.update({
 
60
            'SOCIAL_AUTH_{0}_KEY'.format(self.name): self.client_key,
 
61
            'SOCIAL_AUTH_{0}_SECRET'.format(self.name): self.client_secret,
 
62
            'SOCIAL_AUTH_{0}_ID_TOKEN_DECRYPTION_KEY'.format(self.name):
 
63
                self.client_secret
 
64
        })
 
65
        return settings
 
66
 
 
67
    def access_token_body(self, request, _url, headers):
 
68
        """
 
69
        Get the nonce from the request parameters, add it to the id_token, and
 
70
        return the complete response.
 
71
        """
 
72
        nonce = self.backend.data['nonce'].encode('utf-8')
 
73
        body = self.prepare_access_token_body(nonce=nonce)
 
74
        return 200, headers, body
 
75
 
 
76
    def get_id_token(self, client_key=None, expiration_datetime=None,
 
77
                     issue_datetime=None, nonce=None, issuer=None):
 
78
        """
 
79
        Return the id_token to be added to the access token body.
 
80
        """
 
81
        return {
 
82
            'iss': issuer,
 
83
            'nonce': nonce,
 
84
            'aud': client_key,
 
85
            'azp': client_key,
 
86
            'exp': expiration_datetime,
 
87
            'iat': issue_datetime,
 
88
            'sub': '1234'
 
89
        }
 
90
 
 
91
    def prepare_access_token_body(self, client_key=None, tamper_message=False,
 
92
                                  expiration_datetime=None,
 
93
                                  issue_datetime=None, nonce=None,
 
94
                                  issuer=None):
 
95
        """
 
96
        Prepares a provider access token response. Arguments:
 
97
 
 
98
        client_id       -- (str) OAuth ID for the client that requested
 
99
                                 authentication.
 
100
        expiration_time -- (datetime) Date and time after which the response
 
101
                                      should be considered invalid.
 
102
        """
 
103
 
 
104
        body = {'access_token': 'foobar', 'token_type': 'bearer'}
 
105
        client_key = client_key or self.client_key
 
106
        now = datetime.datetime.utcnow()
 
107
        expiration_datetime = expiration_datetime or \
 
108
                              (now + datetime.timedelta(seconds=30))
 
109
        issue_datetime = issue_datetime or now
 
110
        nonce = nonce or 'a-nonce'
 
111
        issuer = issuer or self.issuer
 
112
        id_token = self.get_id_token(
 
113
            client_key, timegm(expiration_datetime.utctimetuple()),
 
114
            timegm(issue_datetime.utctimetuple()), nonce, issuer)
 
115
 
 
116
        body['id_token'] = JWS(id_token, jwk=self.key, alg='RS256').sign_compact()
 
117
        if tamper_message:
 
118
            header, msg, sig = body['id_token'].split('.')
 
119
            id_token['sub'] = '1235'
 
120
            msg = b64encode_item(id_token).decode('utf-8')
 
121
            body['id_token'] = '.'.join([header, msg, sig])
 
122
 
 
123
        return json.dumps(body)
 
124
 
 
125
    def authtoken_raised(self, expected_message, **access_token_kwargs):
 
126
        self.access_token_body = self.prepare_access_token_body(
 
127
            **access_token_kwargs
 
128
        )
 
129
        with self.assertRaisesRegexp(AuthTokenError, expected_message):
 
130
            self.do_login()
 
131
 
 
132
    @unittest2.skipIf(NO_JWKEST, 'No Jwkest installed')
 
133
    def test_invalid_signature(self):
 
134
        self.authtoken_raised(
 
135
            'Token error: Signature verification failed',
 
136
            tamper_message=True
 
137
        )
 
138
 
 
139
    @unittest2.skipIf(NO_JWKEST, 'No Jwkest installed')
 
140
    def test_expired_signature(self):
 
141
        expiration_datetime = datetime.datetime.utcnow() - \
 
142
                              datetime.timedelta(seconds=30)
 
143
        self.authtoken_raised('Token error: Signature has expired',
 
144
                              expiration_datetime=expiration_datetime)
 
145
 
 
146
    @unittest2.skipIf(NO_JWKEST, 'No Jwkest installed')
 
147
    def test_invalid_issuer(self):
 
148
        self.authtoken_raised('Token error: Invalid issuer',
 
149
                              issuer='someone-else')
 
150
 
 
151
    @unittest2.skipIf(NO_JWKEST, 'No Jwkest installed')
 
152
    def test_invalid_audience(self):
 
153
        self.authtoken_raised('Token error: Invalid audience',
 
154
                              client_key='someone-else')
 
155
 
 
156
    @unittest2.skipIf(NO_JWKEST, 'No Jwkest installed')
 
157
    def test_invalid_issue_time(self):
 
158
        expiration_datetime = datetime.datetime.utcnow() - \
 
159
                              datetime.timedelta(hours=1)
 
160
        self.authtoken_raised('Token error: Incorrect id_token: iat',
 
161
                              issue_datetime=expiration_datetime)
 
162
 
 
163
    @unittest2.skipIf(NO_JWKEST, 'No Jwkest installed')
 
164
    def test_invalid_nonce(self):
 
165
        self.authtoken_raised(
 
166
            'Token error: Incorrect id_token: nonce',
 
167
            nonce='something-wrong'
 
168
        )