2
from calendar import timegm
6
from jwkest import JWKESTException
7
from jwkest.jwk import KEYS
8
from jwkest.jws import JWS
10
from .oauth import BaseOAuth2
11
from ..utils import cache
12
from ..exceptions import AuthTokenError
15
class OpenIdConnectAssociation(object):
16
""" Use Association model to save the nonce by force."""
17
def __init__(self, handle, secret='', issued=0, lifetime=0, assoc_type=''):
18
self.handle = handle # as nonce
19
self.secret = secret.encode() # not use
20
self.issued = issued # not use
21
self.lifetime = lifetime # not use
22
self.assoc_type = assoc_type # as state
25
class OpenIdConnectAuth(BaseOAuth2):
27
Base class for Open ID Connect backends.
28
Currently only the code response type is supported.
30
# Override OIDC_ENDPOINT in your subclass to enable autoconfig of OIDC
32
ID_TOKEN_MAX_AGE = 600
33
DEFAULT_SCOPE = ['openid', 'profile', 'email']
34
EXTRA_DATA = ['id_token', 'refresh_token', ('sub', 'id')]
35
REDIRECT_STATE = False
36
ACCESS_TOKEN_METHOD = 'POST'
37
REVOKE_TOKEN_METHOD = 'GET'
39
USERNAME_KEY = 'preferred_username'
42
AUTHORIZATION_URL = ''
47
def __init__(self, *args, **kwargs):
49
super(OpenIdConnectAuth, self).__init__(*args, **kwargs)
51
def authorization_url(self):
52
return self.AUTHORIZATION_URL or \
53
self.oidc_config().get('authorization_endpoint')
55
def access_token_url(self):
56
return self.ACCESS_TOKEN_URL or \
57
self.oidc_config().get('token_endpoint')
59
def revoke_token_url(self, token, uid):
60
return self.REVOKE_TOKEN_URL or \
61
self.oidc_config().get('revocation_endpoint')
63
def id_token_issuer(self):
64
return self.ID_TOKEN_ISSUER or \
65
self.oidc_config().get('issuer')
67
def userinfo_url(self):
68
return self.USERINFO_URL or \
69
self.oidc_config().get('userinfo_endpoint')
72
return self.JWKS_URI or \
73
self.oidc_config().get('jwks_uri')
76
def oidc_config(self):
77
return self.get_json(self.OIDC_ENDPOINT +
78
'/.well-known/openid-configuration')
81
def get_jwks_keys(self):
83
keys.load_from_url(self.jwks_uri())
85
# Add client secret as oct key so it can be used for HMAC signatures
86
client_id, client_secret = self.get_key_and_secret()
87
keys.add({'key': client_secret, 'kty': 'oct'})
90
def auth_params(self, state=None):
91
"""Return extra arguments needed on auth process."""
92
params = super(OpenIdConnectAuth, self).auth_params(state)
93
params['nonce'] = self.get_and_store_nonce(
94
self.authorization_url(), state
98
def get_and_store_nonce(self, url, state):
100
nonce = self.strategy.random_string(64)
102
association = OpenIdConnectAssociation(nonce, assoc_type=state)
103
self.strategy.storage.association.store(url, association)
106
def get_nonce(self, nonce):
108
return self.strategy.storage.association.get(
109
server_url=self.authorization_url(),
115
def remove_nonce(self, nonce_id):
116
self.strategy.storage.association.remove([nonce_id])
118
def validate_claims(self, id_token):
119
if id_token['iss'] != self.id_token_issuer():
120
raise AuthTokenError(self, 'Invalid issuer')
122
client_id, client_secret = self.get_key_and_secret()
124
if isinstance(id_token['aud'], six.string_types):
125
id_token['aud'] = [id_token['aud']]
127
if client_id not in id_token['aud']:
128
raise AuthTokenError(self, 'Invalid audience')
130
if len(id_token['aud']) > 1 and 'azp' not in id_token:
131
raise AuthTokenError(self, 'Incorrect id_token: azp')
133
if 'azp' in id_token and id_token['azp'] != client_id:
134
raise AuthTokenError(self, 'Incorrect id_token: azp')
136
utc_timestamp = timegm(datetime.datetime.utcnow().utctimetuple())
137
if utc_timestamp > id_token['exp']:
138
raise AuthTokenError(self, 'Signature has expired')
140
if 'nbf' in id_token and utc_timestamp < id_token['nbf']:
141
raise AuthTokenError(self, 'Incorrect id_token: nbf')
143
# Verify the token was issued in the last 10 minutes
144
iat_leeway = self.setting('ID_TOKEN_MAX_AGE', self.ID_TOKEN_MAX_AGE)
145
if utc_timestamp > id_token['iat'] + iat_leeway:
146
raise AuthTokenError(self, 'Incorrect id_token: iat')
148
# Validate the nonce to ensure the request was not modified
149
nonce = id_token.get('nonce')
151
raise AuthTokenError(self, 'Incorrect id_token: nonce')
153
nonce_obj = self.get_nonce(nonce)
155
self.remove_nonce(nonce_obj.id)
157
raise AuthTokenError(self, 'Incorrect id_token: nonce')
159
def validate_and_return_id_token(self, jws):
161
Validates the id_token according to the steps at
162
http://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation.
165
# Decode the JWT and raise an error if the sig is invalid
166
id_token = JWS().verify_compact(jws.encode('utf-8'),
167
self.get_jwks_keys())
168
except JWKESTException:
169
raise AuthTokenError(self, 'Signature verification failed')
171
self.validate_claims(id_token)
175
def request_access_token(self, *args, **kwargs):
177
Retrieve the access token. Also, validate the id_token and
178
store it (temporarily).
180
response = self.get_json(*args, **kwargs)
181
self.id_token = self.validate_and_return_id_token(response['id_token'])
184
def user_data(self, access_token, *args, **kwargs):
185
return self.get_json(self.userinfo_url(), headers={
186
'Authorization': 'Bearer {0}'.format(access_token)
189
def get_user_details(self, response):
190
username_key = self.setting('USERNAME_KEY', default=self.USERNAME_KEY)
192
'username': response.get(username_key),
193
'email': response.get('email'),
194
'fullname': response.get('name'),
195
'first_name': response.get('given_name'),
196
'last_name': response.get('family_name'),