~ubuntu-branches/debian/sid/social-auth-core/sid

« back to all changes in this revision

Viewing changes to social_core/backends/open_id_connect.py

  • Committer: Package Import Robot
  • Author(s): Andre Bianchi
  • Date: 2018-02-22 19:49:12 UTC
  • Revision ID: package-import@ubuntu.com-20180222194912-4lqv8mlhnqc4ncd3
Tags: upstream-1.7.0
ImportĀ upstreamĀ versionĀ 1.7.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import datetime
 
2
from calendar import timegm
 
3
 
 
4
import six
 
5
 
 
6
from jwkest import JWKESTException
 
7
from jwkest.jwk import KEYS
 
8
from jwkest.jws import JWS
 
9
 
 
10
from .oauth import BaseOAuth2
 
11
from ..utils import cache
 
12
from ..exceptions import AuthTokenError
 
13
 
 
14
 
 
15
class OpenIdConnectAssociation(object):
 
16
    """ Use Association model to save the nonce by force."""
 
17
    def __init__(self, handle, secret='', issued=0, lifetime=0, assoc_type=''):
 
18
        self.handle = handle  # as nonce
 
19
        self.secret = secret.encode()  # not use
 
20
        self.issued = issued  # not use
 
21
        self.lifetime = lifetime  # not use
 
22
        self.assoc_type = assoc_type  # as state
 
23
 
 
24
 
 
25
class OpenIdConnectAuth(BaseOAuth2):
 
26
    """
 
27
    Base class for Open ID Connect backends.
 
28
    Currently only the code response type is supported.
 
29
    """
 
30
    # Override OIDC_ENDPOINT in your subclass to enable autoconfig of OIDC
 
31
    OIDC_ENDPOINT = None
 
32
    ID_TOKEN_MAX_AGE = 600
 
33
    DEFAULT_SCOPE = ['openid', 'profile', 'email']
 
34
    EXTRA_DATA = ['id_token', 'refresh_token', ('sub', 'id')]
 
35
    REDIRECT_STATE = False
 
36
    ACCESS_TOKEN_METHOD = 'POST'
 
37
    REVOKE_TOKEN_METHOD = 'GET'
 
38
    ID_KEY = 'sub'
 
39
    USERNAME_KEY = 'preferred_username'
 
40
    ID_TOKEN_ISSUER = ''
 
41
    ACCESS_TOKEN_URL = ''
 
42
    AUTHORIZATION_URL = ''
 
43
    REVOKE_TOKEN_URL = ''
 
44
    USERINFO_URL = ''
 
45
    JWKS_URI = ''
 
46
 
 
47
    def __init__(self, *args, **kwargs):
 
48
        self.id_token = None
 
49
        super(OpenIdConnectAuth, self).__init__(*args, **kwargs)
 
50
 
 
51
    def authorization_url(self):
 
52
        return self.AUTHORIZATION_URL or \
 
53
            self.oidc_config().get('authorization_endpoint')
 
54
 
 
55
    def access_token_url(self):
 
56
        return self.ACCESS_TOKEN_URL or \
 
57
            self.oidc_config().get('token_endpoint')
 
58
 
 
59
    def revoke_token_url(self, token, uid):
 
60
        return self.REVOKE_TOKEN_URL or \
 
61
            self.oidc_config().get('revocation_endpoint')
 
62
 
 
63
    def id_token_issuer(self):
 
64
        return self.ID_TOKEN_ISSUER or \
 
65
            self.oidc_config().get('issuer')
 
66
 
 
67
    def userinfo_url(self):
 
68
        return self.USERINFO_URL or \
 
69
            self.oidc_config().get('userinfo_endpoint')
 
70
 
 
71
    def jwks_uri(self):
 
72
        return self.JWKS_URI or \
 
73
            self.oidc_config().get('jwks_uri')
 
74
 
 
75
    @cache(ttl=86400)
 
76
    def oidc_config(self):
 
77
        return self.get_json(self.OIDC_ENDPOINT +
 
78
                             '/.well-known/openid-configuration')
 
79
 
 
80
    @cache(ttl=86400)
 
81
    def get_jwks_keys(self):
 
82
        keys = KEYS()
 
83
        keys.load_from_url(self.jwks_uri())
 
84
 
 
85
        # Add client secret as oct key so it can be used for HMAC signatures
 
86
        client_id, client_secret = self.get_key_and_secret()
 
87
        keys.add({'key': client_secret, 'kty': 'oct'})
 
88
        return keys
 
89
 
 
90
    def auth_params(self, state=None):
 
91
        """Return extra arguments needed on auth process."""
 
92
        params = super(OpenIdConnectAuth, self).auth_params(state)
 
93
        params['nonce'] = self.get_and_store_nonce(
 
94
            self.authorization_url(), state
 
95
        )
 
96
        return params
 
97
 
 
98
    def get_and_store_nonce(self, url, state):
 
99
        # Create a nonce
 
100
        nonce = self.strategy.random_string(64)
 
101
        # Store the nonce
 
102
        association = OpenIdConnectAssociation(nonce, assoc_type=state)
 
103
        self.strategy.storage.association.store(url, association)
 
104
        return nonce
 
105
 
 
106
    def get_nonce(self, nonce):
 
107
        try:
 
108
            return self.strategy.storage.association.get(
 
109
                server_url=self.authorization_url(),
 
110
                handle=nonce
 
111
            )[0]
 
112
        except IndexError:
 
113
            pass
 
114
 
 
115
    def remove_nonce(self, nonce_id):
 
116
        self.strategy.storage.association.remove([nonce_id])
 
117
 
 
118
    def validate_claims(self, id_token):
 
119
        if id_token['iss'] != self.id_token_issuer():
 
120
            raise AuthTokenError(self, 'Invalid issuer')
 
121
 
 
122
        client_id, client_secret = self.get_key_and_secret()
 
123
 
 
124
        if isinstance(id_token['aud'], six.string_types):
 
125
            id_token['aud'] = [id_token['aud']]
 
126
 
 
127
        if client_id not in id_token['aud']:
 
128
            raise AuthTokenError(self, 'Invalid audience')
 
129
 
 
130
        if len(id_token['aud']) > 1 and 'azp' not in id_token:
 
131
            raise AuthTokenError(self, 'Incorrect id_token: azp')
 
132
 
 
133
        if 'azp' in id_token and id_token['azp'] != client_id:
 
134
            raise AuthTokenError(self, 'Incorrect id_token: azp')
 
135
 
 
136
        utc_timestamp = timegm(datetime.datetime.utcnow().utctimetuple())
 
137
        if utc_timestamp > id_token['exp']:
 
138
            raise AuthTokenError(self, 'Signature has expired')
 
139
 
 
140
        if 'nbf' in id_token and utc_timestamp < id_token['nbf']:
 
141
            raise AuthTokenError(self, 'Incorrect id_token: nbf')
 
142
 
 
143
        # Verify the token was issued in the last 10 minutes
 
144
        iat_leeway = self.setting('ID_TOKEN_MAX_AGE', self.ID_TOKEN_MAX_AGE)
 
145
        if utc_timestamp > id_token['iat'] + iat_leeway:
 
146
            raise AuthTokenError(self, 'Incorrect id_token: iat')
 
147
 
 
148
        # Validate the nonce to ensure the request was not modified
 
149
        nonce = id_token.get('nonce')
 
150
        if not nonce:
 
151
            raise AuthTokenError(self, 'Incorrect id_token: nonce')
 
152
 
 
153
        nonce_obj = self.get_nonce(nonce)
 
154
        if nonce_obj:
 
155
            self.remove_nonce(nonce_obj.id)
 
156
        else:
 
157
            raise AuthTokenError(self, 'Incorrect id_token: nonce')
 
158
 
 
159
    def validate_and_return_id_token(self, jws):
 
160
        """
 
161
        Validates the id_token according to the steps at
 
162
        http://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation.
 
163
        """
 
164
        try:
 
165
            # Decode the JWT and raise an error if the sig is invalid
 
166
            id_token = JWS().verify_compact(jws.encode('utf-8'),
 
167
                                            self.get_jwks_keys())
 
168
        except JWKESTException:
 
169
            raise AuthTokenError(self, 'Signature verification failed')
 
170
 
 
171
        self.validate_claims(id_token)
 
172
 
 
173
        return id_token
 
174
 
 
175
    def request_access_token(self, *args, **kwargs):
 
176
        """
 
177
        Retrieve the access token. Also, validate the id_token and
 
178
        store it (temporarily).
 
179
        """
 
180
        response = self.get_json(*args, **kwargs)
 
181
        self.id_token = self.validate_and_return_id_token(response['id_token'])
 
182
        return response
 
183
 
 
184
    def user_data(self, access_token, *args, **kwargs):
 
185
        return self.get_json(self.userinfo_url(), headers={
 
186
            'Authorization': 'Bearer {0}'.format(access_token)
 
187
        })
 
188
 
 
189
    def get_user_details(self, response):
 
190
        username_key = self.setting('USERNAME_KEY', default=self.USERNAME_KEY)
 
191
        return {
 
192
            'username': response.get(username_key),
 
193
            'email': response.get('email'),
 
194
            'fullname': response.get('name'),
 
195
            'first_name': response.get('given_name'),
 
196
            'last_name': response.get('family_name'),
 
197
        }