~landscape/zope3/ztk-1.1.3

« back to all changes in this revision

Viewing changes to src/twisted/web2/test/test_cgi.py

  • Committer: Andreas Hasenack
  • Date: 2009-07-20 17:49:16 UTC
  • Revision ID: andreas@canonical.com-20090720174916-g2tn6qmietz2hn0u
Revert twisted removal, it breaks several dozen tests [trivial]

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/python
 
2
 
 
3
import sys, os
 
4
 
 
5
from twisted.trial import unittest
 
6
from twisted.internet import reactor, interfaces
 
7
from twisted.python import util
 
8
from twisted.web2 import static, twcgi, server, resource, channel, http, iweb
 
9
 
 
10
try:
 
11
    from twisted.web import client
 
12
except ImportError:
 
13
    # No twisted.web installed, and no web2 client yet.
 
14
    client = None
 
15
 
 
16
DUMMY_CGI = '''\
 
17
print "Header: OK"
 
18
print
 
19
print "cgi output"
 
20
'''
 
21
 
 
22
READINPUT_CGI = '''\
 
23
# this is an example of a correctly-written CGI script which reads a body
 
24
# from stdin, which only reads env['CONTENT_LENGTH'] bytes.
 
25
 
 
26
import os, sys
 
27
 
 
28
body_length = int(os.environ.get('CONTENT_LENGTH',0))
 
29
indata = sys.stdin.read(body_length)
 
30
print "Header: OK"
 
31
print
 
32
print "readinput ok"
 
33
'''
 
34
 
 
35
READALLINPUT_CGI = '''\
 
36
# this is an example of the typical (incorrect) CGI script which expects
 
37
# the server to close stdin when the body of the request is complete.
 
38
# A correct CGI should only read env['CONTENT_LENGTH'] bytes.
 
39
 
 
40
import sys
 
41
 
 
42
indata = sys.stdin.read()
 
43
print "Header: OK"
 
44
print
 
45
print "readallinput ok"
 
46
'''
 
47
 
 
48
class PythonScript(twcgi.FilteredScript):
 
49
    filter = sys.executable
 
50
    filters = sys.executable,  # web2's version
 
51
 
 
52
class CGI(unittest.TestCase):
 
53
    p = None
 
54
    def startServer(self, cgi):
 
55
        root = resource.Resource()
 
56
        cgipath = util.sibpath(__file__, cgi)
 
57
        root.putChild("cgi", PythonScript(cgipath))
 
58
        site = server.Site(root)
 
59
        self.p = reactor.listenTCP(0, channel.HTTPFactory(site))
 
60
        return self.p.getHost().port
 
61
 
 
62
    def tearDown(self):
 
63
        if self.p:
 
64
            return self.p.stopListening()
 
65
 
 
66
    def testCGI(self):
 
67
        cgiFilename = os.path.abspath(self.mktemp())
 
68
        cgiFile = file(cgiFilename, 'wt')
 
69
        cgiFile.write(DUMMY_CGI)
 
70
        cgiFile.close()
 
71
 
 
72
        portnum = self.startServer(cgiFilename)
 
73
        d = client.getPage("http://localhost:%d/cgi" % portnum)
 
74
        d.addCallback(self._testCGI_1)
 
75
        return d
 
76
    def _testCGI_1(self, res):
 
77
        self.failUnlessEqual(res, "cgi output%s" % os.linesep)
 
78
 
 
79
 
 
80
    def testReadEmptyInput(self):
 
81
        cgiFilename = os.path.abspath(self.mktemp())
 
82
        cgiFile = file(cgiFilename, 'wt')
 
83
        cgiFile.write(READINPUT_CGI)
 
84
        cgiFile.close()
 
85
 
 
86
        portnum = self.startServer(cgiFilename)
 
87
        d = client.getPage("http://localhost:%d/cgi" % portnum)
 
88
        d.addCallback(self._testReadEmptyInput_1)
 
89
        return d
 
90
 
 
91
    def _testReadEmptyInput_1(self, res):
 
92
        self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
 
93
 
 
94
    def testReadInput(self):
 
95
        cgiFilename = os.path.abspath(self.mktemp())
 
96
        cgiFile = file(cgiFilename, 'wt')
 
97
        cgiFile.write(READINPUT_CGI)
 
98
        cgiFile.close()
 
99
 
 
100
        portnum = self.startServer(cgiFilename)
 
101
        d = client.getPage("http://localhost:%d/cgi" % portnum,
 
102
                           method="POST",
 
103
                           postdata="Here is your stdin")
 
104
        d.addCallback(self._testReadInput_1)
 
105
        return d
 
106
 
 
107
    def _testReadInput_1(self, res):
 
108
        self.failUnlessEqual(res, "readinput ok%s" % os.linesep)
 
109
 
 
110
 
 
111
    def testRealAllInput(self):
 
112
        cgiFilename = os.path.abspath(self.mktemp())
 
113
        cgiFile = file(cgiFilename, 'wt')
 
114
        cgiFile.write(READALLINPUT_CGI)
 
115
        cgiFile.close()
 
116
 
 
117
        portnum = self.startServer(cgiFilename)
 
118
        d = client.getPage("http://localhost:%d/cgi" % portnum,
 
119
                           method="POST",
 
120
                           postdata="Here is your stdin")
 
121
        d.addCallback(self._testRealAllInput_1)
 
122
        return d
 
123
 
 
124
    def _testRealAllInput_1(self, res):
 
125
        self.failUnlessEqual(res, "readallinput ok%s" % os.linesep)
 
126
 
 
127
if not interfaces.IReactorProcess.providedBy(reactor):
 
128
    CGI.skip = "CGI tests require a functional reactor.spawnProcess()"
 
129
 
 
130
if not client:
 
131
    CGI.skip = "CGI tests require a twisted.web at the moment."
 
132
 
 
133
class CGIDirectoryTest(unittest.TestCase):
 
134
    def setUp(self):
 
135
        temp = self.mktemp()
 
136
        os.mkdir(temp)
 
137
 
 
138
        cgiFile = open(os.path.join(temp, 'dummy'), 'wt')
 
139
        cgiFile.write(DUMMY_CGI)
 
140
        cgiFile.close()
 
141
 
 
142
        os.mkdir(os.path.join(temp, 'directory'))
 
143
        
 
144
        self.root = twcgi.CGIDirectory(temp)
 
145
 
 
146
    def testNotFound(self):
 
147
        
 
148
        self.assertRaises(http.HTTPError,
 
149
                          self.root.locateChild, None, ('notHere',))
 
150
 
 
151
    def testCantRender(self):
 
152
        response = self.root.render(None)
 
153
 
 
154
        self.failUnless(iweb.IResponse.providedBy(response))
 
155
        self.assertEquals(response.code, 403)
 
156
 
 
157
    def testFoundScript(self):
 
158
        resource, segments = self.root.locateChild(None, ('dummy',))
 
159
 
 
160
        self.assertEquals(segments, ())
 
161
 
 
162
        self.failUnless(isinstance(resource, (twcgi.CGIScript,)))
 
163
 
 
164
    def testSubDirectory(self):
 
165
        resource, segments = self.root.locateChild(None, ('directory',
 
166
                                                          'paths',
 
167
                                                          'that',
 
168
                                                          'dont',
 
169
                                                          'matter'))
 
170
 
 
171
        self.failUnless(isinstance(resource, twcgi.CGIDirectory))
 
172
 
 
173
    def createScript(self, filename):
 
174
        cgiFile = open(filename, 'wt')
 
175
        cgiFile.write("#!%s\n\n%s" % (sys.executable,
 
176
                                      DUMMY_CGI))
 
177
        cgiFile.close()
 
178
        os.chmod(filename, 0700)
 
179
 
 
180
    def testScriptsExecute(self):
 
181
        cgiBinDir = os.path.abspath(self.mktemp())
 
182
        os.mkdir(cgiBinDir)
 
183
        root = twcgi.CGIDirectory(cgiBinDir)
 
184
 
 
185
        self.createScript(os.path.join(cgiBinDir, 'dummy'))
 
186
 
 
187
        cgiSubDir = os.path.join(cgiBinDir, 'sub')
 
188
        os.mkdir(cgiSubDir)
 
189
 
 
190
        self.createScript(os.path.join(cgiSubDir, 'dummy'))
 
191
 
 
192
        self.p = reactor.listenTCP(0, channel.HTTPFactory(server.Site(root)))
 
193
        portnum = self.p.getHost().port
 
194
 
 
195
        def _firstResponse(res):
 
196
            self.failUnlessEqual(res, "cgi output%s" % os.linesep)
 
197
            
 
198
            return client.getPage('http://localhost:%d/sub/dummy' % portnum)
 
199
 
 
200
        def _secondResponse(res):
 
201
            self.failUnlessEqual(res, "cgi output%s" % os.linesep)
 
202
 
 
203
        def _cleanup(res):
 
204
            d = self.p.stopListening()
 
205
            d.addCallback(lambda ign: res)
 
206
            return d
 
207
 
 
208
        d = client.getPage('http://localhost:%d/dummy' % portnum)
 
209
 
 
210
        d.addCallback(_firstResponse
 
211
        ).addCallback(_secondResponse
 
212
        ).addBoth(_cleanup)
 
213
 
 
214
        return d