~ubuntuone-pqm-team/canonical-identity-provider/trunk

« back to all changes in this revision

Viewing changes to wsgi_test_server.py

  • Committer: Danny Tamez
  • Date: 2010-04-21 15:29:24 UTC
  • Revision ID: danny.tamez@canonical.com-20100421152924-lq1m92tstk2iz75a
Canonical SSO Provider (Open Source) - Initial Commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2010 Canonical Ltd.  This software is licensed under the
 
2
# GNU Affero General Public License version 3 (see the file LICENSE).
 
3
 
 
4
# Run the app using this WSGI server to run the doctests.
 
5
# You'll need to make this accessible on port 80 for the tests to work.
 
6
 
 
7
import os
 
8
import signal
 
9
import sys
 
10
sys.path.append('..')
 
11
if 'DJANGO_SETTINGS_MODULE' not in os.environ:
 
12
    os.environ['DJANGO_SETTINGS_MODULE'] = 'settings'
 
13
import time
 
14
 
 
15
from django.conf import settings
 
16
from django.core import mail
 
17
from django.db import connection
 
18
from django.test.utils import TestSMTPConnection
 
19
 
 
20
from wsgiref.simple_server import (make_server, WSGIRequestHandler,
 
21
                                   ServerHandler)
 
22
from wsgiref.handlers import format_date_time
 
23
from wsgiref.headers import Headers
 
24
from identityprovider.wsgi import make_app
 
25
 
 
26
 
 
27
# Overriding some default settings
 
28
settings.SSO_ROOT_URL = 'http://openid.launchpad.dev/'
 
29
settings.SSO_PROVIDER_URL = 'http://openid.launchpad.dev/+openid'
 
30
settings.OPENID_PREAUTHORIZATION_ACL = (
 
31
    ('http://launchpad.dev/', 'http://launchpad.dev/'),
 
32
)
 
33
settings.SSO_RESTRICT_RP = False
 
34
# Set up email sending into a sandbox
 
35
mail.SMTPConnection = TestSMTPConnection
 
36
mail.outbox = []
 
37
 
 
38
server_port = 80  # We need to be run here for the LP tests to find us
 
39
 
 
40
app = make_app()
 
41
 
 
42
 
 
43
class TestingHeaders(Headers):
 
44
    """ Same as regular Headers, but join with \n instead of \r\n """
 
45
    def __str__(self):
 
46
        return '\n'.join(["%s: %s" % kv for kv in self._headers] + ['', ''])
 
47
 
 
48
 
 
49
class StatusServerHandler(ServerHandler):
 
50
    """ Same as regular ServerHandler, but send a duplicate Status header
 
51
        to compensate for zope.testbrowser's behaviour.
 
52
        Also, join with \n instead of \r\n.
 
53
    """
 
54
    headers_class = TestingHeaders
 
55
 
 
56
    def send_preamble(self):
 
57
        if self.client_is_modern():
 
58
            self._write('HTTP/%s %s\n' % (self.http_version, self.status))
 
59
            self._write('Status: %s\n' % (self.status.title(),))
 
60
            if not self.headers.has_key('Date'):
 
61
                self._write(
 
62
                    'Date: %s\n' % format_date_time(time.time())
 
63
                )
 
64
            if self.server_software and not self.headers.has_key('Server'):
 
65
                self._write('Server: %s\n' % self.server_software)
 
66
 
 
67
 
 
68
class StatusWSGIRequestHandler(WSGIRequestHandler):
 
69
    """
 
70
    Just like regular WSGIRequestHandlers, but uses
 
71
    StatusServerHandler instead of ServerHandler
 
72
    """
 
73
 
 
74
    verbose = False
 
75
 
 
76
    def handle(self):
 
77
        self.raw_requestline = self.rfile.readline()
 
78
        if not self.parse_request():  # An error code has been sent, just exit
 
79
            return
 
80
 
 
81
        handler = StatusServerHandler(
 
82
            self.rfile, self.wfile, self.get_stderr(), self.get_environ()
 
83
        )
 
84
        handler.request_handler = self      # backpointer for logging
 
85
        handler.run(self.server.get_app())
 
86
        if connection.connection is not None:
 
87
            connection.connection.close()
 
88
        connection.connection = None
 
89
 
 
90
    def log_request(self, format, *args):
 
91
        if self.verbose:
 
92
            WSGIRequestHandler.log_request(self, format, *args)
 
93
 
 
94
 
 
95
httpd = make_server('', server_port, app,
 
96
                    handler_class=StatusWSGIRequestHandler)
 
97
 
 
98
# Setup code coverage tracking
 
99
import coverage
 
100
 
 
101
cov = coverage.coverage(auto_data=True, data_suffix=True)
 
102
cov.start()
 
103
 
 
104
if '-v' in sys.argv:
 
105
    StatusWSGIRequestHandler.verbose = True
 
106
 
 
107
 
 
108
# This snippet is required to make the coverage report complete
 
109
def signal_handler(signum, frame):
 
110
    sys.exit(0)
 
111
signal.signal(signal.SIGTERM, signal_handler)
 
112
 
 
113
print "Serving HTTP on port %s..." % server_port
 
114
 
 
115
# Respond to requests until process is killed
 
116
httpd.serve_forever()