~roadmr/canonical-identity-provider/update-raven

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Copyright 2011-2013 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
import logging
from urlparse import urlparse, urlunparse

from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils.datastructures import MultiValueDictKeyError
from django.views.decorators.csrf import csrf_exempt
from saml2idp import registry, saml2idp_metadata, xml_signing
from saml2idp.exceptions import CannotHandleAssertion
from saml2idp.views import (
    _generate_response,
    login_init,
    login_process,
)

from ubuntu_sso_saml.models import SAMLConfig
from ubuntu_sso_saml.processors import InfoProcessor
from ubuntu_sso_saml.utils import (
    get_config_and_link_for_resource,
    get_deeplink_url,
)

# XXX: we should not import webui here
from webui.decorators import sso_login_required

logger = logging.getLogger(__name__)


def strip_url_path(url):
    # Chop off path components to avoid exposing possible secrets sent as path
    # or query string params.
    # Since this information is used for diagnostic purposes, this is not ideal
    # because it may chop off important parts of the URL, but it's a reasonable
    # compromise since the most common problem is a bad referer or ACS_URL base
    # path which is how processors filter requests.
    pieces = urlparse(url)[0:2]
    url = urlunparse(pieces + ('', '', '', ''))
    return url


@csrf_exempt
def saml_begin(request):
    """
    Receives a SAML 2.0 AuthnRequest from a Service Provider and
    stores it in the session prior to enforcing login.
    """
    if request.method == 'POST':
        source = request.POST
    else:
        source = request.GET
    try:
        # Store these values now, because Django's login cycle won't
        # preserve them.
        request.session['SAMLRequest'] = source['SAMLRequest']
        request.session['RelayState'] = source.get('RelayState')
        return redirect('login_process')
    except MultiValueDictKeyError as e:
        logger.exception(
            "Access to /+saml endpoint with no SAML request: %s", e)
        context = {}
        if request.META.get('HTTP_REFERER'):
            context['http_referer'] = strip_url_path(
                request.META.get('HTTP_REFERER'))
        return render(request, 'ubuntu_sso_saml/no_saml.html', context)


@sso_login_required
def saml_init(request, resource, **kwargs):
    """
    Initiates an IdP-initiated link to a simple SP resource/target URL.
    """
    # try to fetch SP config from db
    sp_config, link = get_config_and_link_for_resource(resource)
    if sp_config is not None:
        # if an SP entry was found, it means we're supposed to process it
        # using the CanonicalProcessor
        proc = registry.get_processor(
            'ubuntu_sso_saml.processors.CanonicalProcessor')
        url = get_deeplink_url(resource, link)
        proc.init_deep_link(request, sp_config, url)
        return _generate_response(request, proc)
    else:
        # fall back to settings-based processing of request
        return login_init(request, resource, **kwargs)


@sso_login_required
def saml_process(request):
    # Now actually try to process the assertion.
    try:
        return login_process(request)
    except CannotHandleAssertion as e:
        # Try to get more info from the assertion to show a nice error page
        try:
            info_processor = InfoProcessor()
            req_params = info_processor.get_request_params(request)
            acs_url = req_params.get('ACS_URL', 'Unknown requester')
            acs_url = strip_url_path(acs_url)

        except (CannotHandleAssertion, KeyError) as e:
            logger.exception("Can't peek into SAML request: %s", e)
            acs_url = "a non-SAML request."
        logger.exception("Can't handle assertion from %s", acs_url)
        context = {"acs_url": acs_url}
        return render(
            request, 'ubuntu_sso_saml/saml_remote_not_found.html', context)


# backported descriptor view (from saml2idp.views) as it needed fixing
def saml_metadata(request, sp_id=None):
    """
    Replies with the XML Metadata IDSSODescriptor.
    """
    config = saml2idp_metadata.SAML2IDP_CONFIG
    entity_id = config['issuer']
    slo_url = request.build_absolute_uri(reverse('logout'))
    sso_url = request.build_absolute_uri(reverse('login_begin'))
    pubkey = xml_signing.load_cert_data(config['certificate_file'])
    if sp_id:
        sp = get_object_or_404(SAMLConfig, pk=sp_id)
        if sp.certificate:
            pubkey = xml_signing.load_cert_data_from_string(
                str(sp.certificate))

    tv = {
        'entity_id': entity_id,
        'cert_public_key': pubkey,
        'slo_url': slo_url,
        'sso_url': sso_url,
    }
    return render(
        request,
        'saml2idp/idpssodescriptor.xml',
        tv,
        content_type='application/xml'
    )