1
# Copyright (c) 2009 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.web.twcgi}.
10
from twisted.trial import unittest
11
from twisted.internet import reactor, interfaces, error
12
from twisted.python import util, failure
13
from twisted.web.http import NOT_FOUND, INTERNAL_SERVER_ERROR
14
from twisted.web import client, twcgi, server, resource
15
from twisted.web.test._util import _render
16
from twisted.web.test.test_web import DummyRequest
25
# this is an example of a correctly-written CGI script which reads a body
26
# from stdin, which only reads env['CONTENT_LENGTH'] bytes.
30
body_length = int(os.environ.get('CONTENT_LENGTH',0))
31
indata = sys.stdin.read(body_length)
37
READALLINPUT_CGI = '''\
38
# this is an example of the typical (incorrect) CGI script which expects
39
# the server to close stdin when the body of the request is complete.
40
# A correct CGI should only read env['CONTENT_LENGTH'] bytes.
44
indata = sys.stdin.read()
47
print "readallinput ok"
50
class PythonScript(twcgi.FilteredScript):
51
filter = sys.executable
52
filters = sys.executable, # web2's version
54
class CGI(unittest.TestCase):
56
Tests for L{twcgi.FilteredScript}.
59
if not interfaces.IReactorProcess.providedBy(reactor):
60
skip = "CGI tests require a functional reactor.spawnProcess()"
62
def startServer(self, cgi):
63
root = resource.Resource()
64
cgipath = util.sibpath(__file__, cgi)
65
root.putChild("cgi", PythonScript(cgipath))
66
site = server.Site(root)
67
self.p = reactor.listenTCP(0, site)
68
return self.p.getHost().port
72
return self.p.stopListening()
76
cgiFilename = os.path.abspath(self.mktemp())
77
cgiFile = file(cgiFilename, 'wt')
78
cgiFile.write(DUMMY_CGI)
81
portnum = self.startServer(cgiFilename)
82
d = client.getPage("http://localhost:%d/cgi" % portnum)
83
d.addCallback(self._testCGI_1)
85
def _testCGI_1(self, res):
86
self.failUnlessEqual(res, "cgi output" + os.linesep)
89
def testReadEmptyInput(self):
90
cgiFilename = os.path.abspath(self.mktemp())
91
cgiFile = file(cgiFilename, 'wt')
92
cgiFile.write(READINPUT_CGI)
95
portnum = self.startServer(cgiFilename)
96
d = client.getPage("http://localhost:%d/cgi" % portnum)
97
d.addCallback(self._testReadEmptyInput_1)
99
testReadEmptyInput.timeout = 5
100
def _testReadEmptyInput_1(self, res):
101
self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
103
def testReadInput(self):
104
cgiFilename = os.path.abspath(self.mktemp())
105
cgiFile = file(cgiFilename, 'wt')
106
cgiFile.write(READINPUT_CGI)
109
portnum = self.startServer(cgiFilename)
110
d = client.getPage("http://localhost:%d/cgi" % portnum,
112
postdata="Here is your stdin")
113
d.addCallback(self._testReadInput_1)
115
testReadInput.timeout = 5
116
def _testReadInput_1(self, res):
117
self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
120
def testReadAllInput(self):
121
cgiFilename = os.path.abspath(self.mktemp())
122
cgiFile = file(cgiFilename, 'wt')
123
cgiFile.write(READALLINPUT_CGI)
126
portnum = self.startServer(cgiFilename)
127
d = client.getPage("http://localhost:%d/cgi" % portnum,
129
postdata="Here is your stdin")
130
d.addCallback(self._testReadAllInput_1)
132
testReadAllInput.timeout = 5
133
def _testReadAllInput_1(self, res):
134
self.failUnlessEqual(res, "readallinput ok%s" % os.linesep)
138
class CGIDirectoryTests(unittest.TestCase):
140
Tests for L{twcgi.CGIDirectory}.
142
def test_render(self):
144
L{twcgi.CGIDirectory.render} sets the HTTP response code to I{NOT
147
resource = twcgi.CGIDirectory(self.mktemp())
148
request = DummyRequest([''])
149
d = _render(resource, request)
150
def cbRendered(ignored):
151
self.assertEqual(request.responseCode, NOT_FOUND)
152
d.addCallback(cbRendered)
156
def test_notFoundChild(self):
158
L{twcgi.CGIDirectory.getChild} returns a resource which renders an
159
response with the HTTP I{NOT FOUND} status code if the indicated child
160
does not exist as an entry in the directory used to initialized the
161
L{twcgi.CGIDirectory}.
165
resource = twcgi.CGIDirectory(path)
166
request = DummyRequest(['foo'])
167
child = resource.getChild("foo", request)
168
d = _render(child, request)
169
def cbRendered(ignored):
170
self.assertEqual(request.responseCode, NOT_FOUND)
171
d.addCallback(cbRendered)
176
class CGIProcessProtocolTests(unittest.TestCase):
178
Tests for L{twcgi.CGIProcessProtocol}.
180
def test_prematureEndOfHeaders(self):
182
If the process communicating with L{CGIProcessProtocol} ends before
183
finishing writing out headers, the response has I{INTERNAL SERVER
184
ERROR} as its status code.
186
request = DummyRequest([''])
187
protocol = twcgi.CGIProcessProtocol(request)
188
protocol.processEnded(failure.Failure(error.ProcessTerminated()))
189
self.assertEqual(request.responseCode, INTERNAL_SERVER_ERROR)