~ubuntuone-pqm-team/canonical-identity-provider/trunk

« back to all changes in this revision

Viewing changes to identityprovider/models/captcha.py

  • Committer: Danny Tamez
  • Date: 2010-04-21 15:29:24 UTC
  • Revision ID: danny.tamez@canonical.com-20100421152924-lq1m92tstk2iz75a
Canonical SSO Provider (Open Source) - Initial Commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2010 Canonical Ltd.  This software is licensed under the
 
2
# GNU Affero General Public License version 3 (see the file LICENSE).
 
3
 
 
4
import re
 
5
import traceback
 
6
import socket
 
7
import urllib
 
8
import urllib2
 
9
import logging
 
10
 
 
11
from django.conf import settings
 
12
 
 
13
 
 
14
class CaptchaResponse(object):
 
15
    """Response returned from _open() in case of error"""
 
16
 
 
17
    def __init__(self, code, response, tb=None):
 
18
        self.code = code
 
19
        self.response = response
 
20
        self.traceback = tb
 
21
        self.is_error = tb is not None
 
22
        self._data = None
 
23
 
 
24
    def data(self):
 
25
        if self._data:
 
26
            return self._data
 
27
        if self.response:
 
28
            self._data = self.response.read()
 
29
            return self._data
 
30
        else:
 
31
            return None
 
32
 
 
33
 
 
34
class Captcha(object):
 
35
    """
 
36
    Class to capture & abstract interaction with external captcha service.
 
37
 
 
38
    Current implementation uses reCaptcha service
 
39
 
 
40
    .. note: There's no possibility to actually test verification actually
 
41
             returning true, as response, for that you need human interaction.
 
42
 
 
43
    Getting new captcha is quite simple:
 
44
 
 
45
        >>> captcha = Captcha.new()
 
46
        >>> captcha.serialize() #+doctest: ELLIPSIS
 
47
        {'captcha_id': ..., 'image_url': ...}
 
48
 
 
49
    As is verifying received solution:
 
50
 
 
51
        >>> captcha = Captcha('captcha-id-received-from-client')
 
52
        >>> captcha.verify("this-is-invalid-solution")
 
53
        False
 
54
 
 
55
    Once verified solution is cached, so calling again to .verify() method is
 
56
    very cheap (and returns same result):
 
57
 
 
58
        >>> captcha.verify("this-is-invalid-solution")
 
59
        False
 
60
 
 
61
    You can also get original response from reCaptcha:
 
62
 
 
63
        >>> print captcha.response.data()
 
64
        true
 
65
        success
 
66
 
 
67
    """
 
68
 
 
69
    opener = None
 
70
 
 
71
    def __init__(self, env, captcha_id, image_url=None, response=None):
 
72
        self.env = env
 
73
        self.captcha_id = captcha_id
 
74
        self.image_url = image_url
 
75
        self.response = response
 
76
 
 
77
        self._verified = None
 
78
 
 
79
        self._setup_opener()
 
80
 
 
81
    @classmethod
 
82
    def _setup_opener(cls):
 
83
        if cls.opener is not None:
 
84
            return
 
85
        if getattr(settings, 'CAPTCHA_USE_PROXY', False):
 
86
            proxy_handler = urllib2.ProxyHandler(settings.CAPTCHA_PROXIES)
 
87
            opener = urllib2.build_opener(proxy_handler)
 
88
        else:
 
89
            opener = urllib2.build_opener()
 
90
        cls.opener = opener
 
91
 
 
92
    def serialize(self):
 
93
        return {
 
94
            'captcha_id': self.captcha_id,
 
95
            'image_url': self.image_url,
 
96
        }
 
97
 
 
98
    @classmethod
 
99
    def new(cls, env):
 
100
        cls._setup_opener()
 
101
        url = (settings.CAPTCHA_API_URL +
 
102
               '/challenge?k=%s' % settings.CAPTCHA_PUBLIC_KEY)
 
103
        response = cls._open(url, env)
 
104
        if response.is_error:
 
105
            env['oops-dump'] = True
 
106
            logging.warning(response.traceback)
 
107
            logging.warning("Failed to connect to reCaptcha server")
 
108
            return cls(env, None, None)
 
109
 
 
110
        data = response.data()
 
111
        m = re.search(r"challenge\s*:\s*'(.+?)'", data, re.M | re.S)
 
112
        if m:
 
113
            captcha_id = m.group(1)
 
114
            image_url = settings.CAPTCHA_IMAGE_URL_PATTERN % captcha_id
 
115
        else:
 
116
            captcha_id, image_url = None, None
 
117
        return cls(env, captcha_id, image_url, response)
 
118
 
 
119
    @classmethod
 
120
    def _open(cls, request, env):
 
121
        default_timeout = socket.getdefaulttimeout()
 
122
        socket.setdefaulttimeout(getattr(settings, 'CAPTCHA_TIMEOUT', 10))
 
123
 
 
124
        try:
 
125
            response = cls.opener.open(request)
 
126
        except urllib2.URLError, e:
 
127
            # Attribute depends weather we have HTTPError or URLError
 
128
            error_code = e.code if hasattr(e, 'code') else e.reason[0]
 
129
            if error_code not in (111, 113, 408, 500, 502, 503, 504):
 
130
                # 111: Connection refused
 
131
                # 113: No route to host
 
132
                # 408: Request Timeout
 
133
                # 500: Internal Server Error
 
134
                # 502: Bad Gateway
 
135
                # 503: Service Unavailable
 
136
                # 504: Gateway Timeout
 
137
                raise
 
138
            tb = traceback.format_exc()
 
139
            return CaptchaResponse(error_code, None, tb)
 
140
        finally:
 
141
            socket.setdefaulttimeout(default_timeout)
 
142
 
 
143
        return CaptchaResponse(response.code, response, None)
 
144
 
 
145
    def verify(self, captcha_solution):
 
146
        if self._verified is not None:
 
147
            return self._verified
 
148
 
 
149
        if getattr(settings, 'DISABLE_CAPTCHA_VERIFICATION', False):
 
150
            self.response = None
 
151
            return True
 
152
 
 
153
        request_data = urllib.urlencode({
 
154
            'privatekey': settings.CAPTCHA_PRIVATE_KEY,
 
155
            'remoteip': self.env['REMOTE_ADDR'],
 
156
            'challenge': self.captcha_id,
 
157
            'response': captcha_solution,
 
158
        })
 
159
        request = urllib2.Request(settings.CAPTCHA_VERIFY_URL, request_data)
 
160
        self.response = self._open(request, self.env)
 
161
 
 
162
        if not self.response.is_error:
 
163
            response_data = self.response.data()
 
164
            self.verified, self.message = response_data.split('\n', 1)
 
165
            self._verified = self.verified.lower() == 'true'
 
166
        elif self.captcha_id is None:
 
167
            self.message = 'no-challenge'
 
168
            self._verfied = False
 
169
        else:
 
170
            self._verified = False
 
171
            if getattr(self.env, '__setitem__', False):
 
172
                self.env['oops-dump'] = True
 
173
            logging.warning(self.response.traceback)
 
174
            logging.warning("reCaptcha connection error")
 
175
        return self._verified