5
from twisted.trial import unittest
6
from twisted.internet import reactor, interfaces
7
from twisted.python import util
8
from twisted.web2 import static, twcgi, server, resource, channel, http, iweb
11
from twisted.web import client
13
# No twisted.web installed, and no web2 client yet.
23
# this is an example of a correctly-written CGI script which reads a body
24
# from stdin, which only reads env['CONTENT_LENGTH'] bytes.
28
body_length = int(os.environ.get('CONTENT_LENGTH',0))
29
indata = sys.stdin.read(body_length)
35
READALLINPUT_CGI = '''\
36
# this is an example of the typical (incorrect) CGI script which expects
37
# the server to close stdin when the body of the request is complete.
38
# A correct CGI should only read env['CONTENT_LENGTH'] bytes.
42
indata = sys.stdin.read()
45
print "readallinput ok"
48
class PythonScript(twcgi.FilteredScript):
49
filter = sys.executable
50
filters = sys.executable, # web2's version
52
class CGI(unittest.TestCase):
54
def startServer(self, cgi):
55
root = resource.Resource()
56
cgipath = util.sibpath(__file__, cgi)
57
root.putChild("cgi", PythonScript(cgipath))
58
site = server.Site(root)
59
self.p = reactor.listenTCP(0, channel.HTTPFactory(site))
60
return self.p.getHost().port
64
return self.p.stopListening()
67
cgiFilename = os.path.abspath(self.mktemp())
68
cgiFile = file(cgiFilename, 'wt')
69
cgiFile.write(DUMMY_CGI)
72
portnum = self.startServer(cgiFilename)
73
d = client.getPage("http://localhost:%d/cgi" % portnum)
74
d.addCallback(self._testCGI_1)
76
def _testCGI_1(self, res):
77
self.failUnlessEqual(res, "cgi output%s" % os.linesep)
80
def testReadEmptyInput(self):
81
cgiFilename = os.path.abspath(self.mktemp())
82
cgiFile = file(cgiFilename, 'wt')
83
cgiFile.write(READINPUT_CGI)
86
portnum = self.startServer(cgiFilename)
87
d = client.getPage("http://localhost:%d/cgi" % portnum)
88
d.addCallback(self._testReadEmptyInput_1)
91
def _testReadEmptyInput_1(self, res):
92
self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
94
def testReadInput(self):
95
cgiFilename = os.path.abspath(self.mktemp())
96
cgiFile = file(cgiFilename, 'wt')
97
cgiFile.write(READINPUT_CGI)
100
portnum = self.startServer(cgiFilename)
101
d = client.getPage("http://localhost:%d/cgi" % portnum,
103
postdata="Here is your stdin")
104
d.addCallback(self._testReadInput_1)
107
def _testReadInput_1(self, res):
108
self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
111
def testRealAllInput(self):
112
cgiFilename = os.path.abspath(self.mktemp())
113
cgiFile = file(cgiFilename, 'wt')
114
cgiFile.write(READALLINPUT_CGI)
117
portnum = self.startServer(cgiFilename)
118
d = client.getPage("http://localhost:%d/cgi" % portnum,
120
postdata="Here is your stdin")
121
d.addCallback(self._testRealAllInput_1)
124
def _testRealAllInput_1(self, res):
125
self.failUnlessEqual(res, "readallinput ok%s" % os.linesep)
127
if not interfaces.IReactorProcess.providedBy(reactor):
128
CGI.skip = "CGI tests require a functional reactor.spawnProcess()"
131
CGI.skip = "CGI tests require a twisted.web at the moment."
133
class CGIDirectoryTest(unittest.TestCase):
138
cgiFile = open(os.path.join(temp, 'dummy'), 'wt')
139
cgiFile.write(DUMMY_CGI)
142
os.mkdir(os.path.join(temp, 'directory'))
144
self.root = twcgi.CGIDirectory(temp)
146
def testNotFound(self):
148
self.assertRaises(http.HTTPError,
149
self.root.locateChild, None, ('notHere',))
151
def testCantRender(self):
152
response = self.root.render(None)
154
self.failUnless(iweb.IResponse.providedBy(response))
155
self.assertEquals(response.code, 403)
157
def testFoundScript(self):
158
resource, segments = self.root.locateChild(None, ('dummy',))
160
self.assertEquals(segments, ())
162
self.failUnless(isinstance(resource, (twcgi.CGIScript,)))
164
def testSubDirectory(self):
165
resource, segments = self.root.locateChild(None, ('directory',
171
self.failUnless(isinstance(resource, twcgi.CGIDirectory))
173
def createScript(self, filename):
174
cgiFile = open(filename, 'wt')
175
cgiFile.write("#!%s\n\n%s" % (sys.executable,
178
os.chmod(filename, 0700)
180
def testScriptsExecute(self):
181
cgiBinDir = os.path.abspath(self.mktemp())
183
root = twcgi.CGIDirectory(cgiBinDir)
185
self.createScript(os.path.join(cgiBinDir, 'dummy'))
187
cgiSubDir = os.path.join(cgiBinDir, 'sub')
190
self.createScript(os.path.join(cgiSubDir, 'dummy'))
192
self.p = reactor.listenTCP(0, channel.HTTPFactory(server.Site(root)))
193
portnum = self.p.getHost().port
195
def _firstResponse(res):
196
self.failUnlessEqual(res, "cgi output%s" % os.linesep)
198
return client.getPage('http://localhost:%d/sub/dummy' % portnum)
200
def _secondResponse(res):
201
self.failUnlessEqual(res, "cgi output%s" % os.linesep)
204
d = self.p.stopListening()
205
d.addCallback(lambda ign: res)
208
d = client.getPage('http://localhost:%d/dummy' % portnum)
210
d.addCallback(_firstResponse
211
).addCallback(_secondResponse