~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/web/test/test_cgi.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.web.twcgi}.
 
6
"""
 
7
 
 
8
import sys, os
 
9
 
 
10
from twisted.trial import unittest
 
11
from twisted.internet import reactor, interfaces, error
 
12
from twisted.python import util, failure
 
13
from twisted.web.http import NOT_FOUND, INTERNAL_SERVER_ERROR
 
14
from twisted.web import client, twcgi, server, resource
 
15
from twisted.web.test._util import _render
 
16
from twisted.web.test.test_web import DummyRequest
 
17
 
 
18
DUMMY_CGI = '''\
 
19
print "Header: OK"
 
20
print
 
21
print "cgi output"
 
22
'''
 
23
 
 
24
READINPUT_CGI = '''\
 
25
# this is an example of a correctly-written CGI script which reads a body
 
26
# from stdin, which only reads env['CONTENT_LENGTH'] bytes.
 
27
 
 
28
import os, sys
 
29
 
 
30
body_length = int(os.environ.get('CONTENT_LENGTH',0))
 
31
indata = sys.stdin.read(body_length)
 
32
print "Header: OK"
 
33
print
 
34
print "readinput ok"
 
35
'''
 
36
 
 
37
READALLINPUT_CGI = '''\
 
38
# this is an example of the typical (incorrect) CGI script which expects
 
39
# the server to close stdin when the body of the request is complete.
 
40
# A correct CGI should only read env['CONTENT_LENGTH'] bytes.
 
41
 
 
42
import sys
 
43
 
 
44
indata = sys.stdin.read()
 
45
print "Header: OK"
 
46
print
 
47
print "readallinput ok"
 
48
'''
 
49
 
 
50
class PythonScript(twcgi.FilteredScript):
 
51
    filter = sys.executable
 
52
    filters = sys.executable,  # web2's version
 
53
 
 
54
class CGI(unittest.TestCase):
 
55
    """
 
56
    Tests for L{twcgi.FilteredScript}.
 
57
    """
 
58
 
 
59
    if not interfaces.IReactorProcess.providedBy(reactor):
 
60
        skip = "CGI tests require a functional reactor.spawnProcess()"
 
61
 
 
62
    def startServer(self, cgi):
 
63
        root = resource.Resource()
 
64
        cgipath = util.sibpath(__file__, cgi)
 
65
        root.putChild("cgi", PythonScript(cgipath))
 
66
        site = server.Site(root)
 
67
        self.p = reactor.listenTCP(0, site)
 
68
        return self.p.getHost().port
 
69
 
 
70
    def tearDown(self):
 
71
        if self.p:
 
72
            return self.p.stopListening()
 
73
 
 
74
 
 
75
    def testCGI(self):
 
76
        cgiFilename = os.path.abspath(self.mktemp())
 
77
        cgiFile = file(cgiFilename, 'wt')
 
78
        cgiFile.write(DUMMY_CGI)
 
79
        cgiFile.close()
 
80
 
 
81
        portnum = self.startServer(cgiFilename)
 
82
        d = client.getPage("http://localhost:%d/cgi" % portnum)
 
83
        d.addCallback(self._testCGI_1)
 
84
        return d
 
85
    def _testCGI_1(self, res):
 
86
        self.failUnlessEqual(res, "cgi output" + os.linesep)
 
87
 
 
88
 
 
89
    def testReadEmptyInput(self):
 
90
        cgiFilename = os.path.abspath(self.mktemp())
 
91
        cgiFile = file(cgiFilename, 'wt')
 
92
        cgiFile.write(READINPUT_CGI)
 
93
        cgiFile.close()
 
94
 
 
95
        portnum = self.startServer(cgiFilename)
 
96
        d = client.getPage("http://localhost:%d/cgi" % portnum)
 
97
        d.addCallback(self._testReadEmptyInput_1)
 
98
        return d
 
99
    testReadEmptyInput.timeout = 5
 
100
    def _testReadEmptyInput_1(self, res):
 
101
        self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
 
102
 
 
103
    def testReadInput(self):
 
104
        cgiFilename = os.path.abspath(self.mktemp())
 
105
        cgiFile = file(cgiFilename, 'wt')
 
106
        cgiFile.write(READINPUT_CGI)
 
107
        cgiFile.close()
 
108
 
 
109
        portnum = self.startServer(cgiFilename)
 
110
        d = client.getPage("http://localhost:%d/cgi" % portnum,
 
111
                           method="POST",
 
112
                           postdata="Here is your stdin")
 
113
        d.addCallback(self._testReadInput_1)
 
114
        return d
 
115
    testReadInput.timeout = 5
 
116
    def _testReadInput_1(self, res):
 
117
        self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
 
118
 
 
119
 
 
120
    def testReadAllInput(self):
 
121
        cgiFilename = os.path.abspath(self.mktemp())
 
122
        cgiFile = file(cgiFilename, 'wt')
 
123
        cgiFile.write(READALLINPUT_CGI)
 
124
        cgiFile.close()
 
125
 
 
126
        portnum = self.startServer(cgiFilename)
 
127
        d = client.getPage("http://localhost:%d/cgi" % portnum,
 
128
                           method="POST",
 
129
                           postdata="Here is your stdin")
 
130
        d.addCallback(self._testReadAllInput_1)
 
131
        return d
 
132
    testReadAllInput.timeout = 5
 
133
    def _testReadAllInput_1(self, res):
 
134
        self.failUnlessEqual(res, "readallinput ok%s" % os.linesep)
 
135
 
 
136
 
 
137
 
 
138
class CGIDirectoryTests(unittest.TestCase):
 
139
    """
 
140
    Tests for L{twcgi.CGIDirectory}.
 
141
    """
 
142
    def test_render(self):
 
143
        """
 
144
        L{twcgi.CGIDirectory.render} sets the HTTP response code to I{NOT
 
145
        FOUND}.
 
146
        """
 
147
        resource = twcgi.CGIDirectory(self.mktemp())
 
148
        request = DummyRequest([''])
 
149
        d = _render(resource, request)
 
150
        def cbRendered(ignored):
 
151
            self.assertEqual(request.responseCode, NOT_FOUND)
 
152
        d.addCallback(cbRendered)
 
153
        return d
 
154
 
 
155
 
 
156
    def test_notFoundChild(self):
 
157
        """
 
158
        L{twcgi.CGIDirectory.getChild} returns a resource which renders an
 
159
        response with the HTTP I{NOT FOUND} status code if the indicated child
 
160
        does not exist as an entry in the directory used to initialized the
 
161
        L{twcgi.CGIDirectory}.
 
162
        """
 
163
        path = self.mktemp()
 
164
        os.makedirs(path)
 
165
        resource = twcgi.CGIDirectory(path)
 
166
        request = DummyRequest(['foo'])
 
167
        child = resource.getChild("foo", request)
 
168
        d = _render(child, request)
 
169
        def cbRendered(ignored):
 
170
            self.assertEqual(request.responseCode, NOT_FOUND)
 
171
        d.addCallback(cbRendered)
 
172
        return d
 
173
 
 
174
 
 
175
 
 
176
class CGIProcessProtocolTests(unittest.TestCase):
 
177
    """
 
178
    Tests for L{twcgi.CGIProcessProtocol}.
 
179
    """
 
180
    def test_prematureEndOfHeaders(self):
 
181
        """
 
182
        If the process communicating with L{CGIProcessProtocol} ends before
 
183
        finishing writing out headers, the response has I{INTERNAL SERVER
 
184
        ERROR} as its status code.
 
185
        """
 
186
        request = DummyRequest([''])
 
187
        protocol = twcgi.CGIProcessProtocol(request)
 
188
        protocol.processEnded(failure.Failure(error.ProcessTerminated()))
 
189
        self.assertEqual(request.responseCode, INTERNAL_SERVER_ERROR)
 
190