~pythonregexp2.7/python/issue2636

« back to all changes in this revision

Viewing changes to Lib/test/test_httpservers.py

  • Committer: Jeffrey C. "The TimeHorse" Jacobs
  • Date: 2008-05-24 16:05:21 UTC
  • mfrom: (39021.1.401 Regexp-2.6)
  • Revision ID: darklord@timehorse.com-20080524160521-1xenj7p6u3wb89et
Merged in changes from the latest python source snapshot.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Unittests for the various HTTPServer modules.
 
2
 
 
3
Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
 
4
Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
 
5
"""
 
6
 
 
7
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
8
from SimpleHTTPServer import SimpleHTTPRequestHandler
 
9
from CGIHTTPServer import CGIHTTPRequestHandler
 
10
 
 
11
import os
 
12
import sys
 
13
import base64
 
14
import shutil
 
15
import urllib
 
16
import httplib
 
17
import tempfile
 
18
import threading
 
19
 
 
20
import unittest
 
21
from test import test_support
 
22
 
 
23
 
 
24
class NoLogRequestHandler:
 
25
    def log_message(self, *args):
 
26
        # don't write log messages to stderr
 
27
        pass
 
28
 
 
29
 
 
30
class TestServerThread(threading.Thread):
 
31
    def __init__(self, test_object, request_handler):
 
32
        threading.Thread.__init__(self)
 
33
        self.request_handler = request_handler
 
34
        self.test_object = test_object
 
35
        self.test_object.lock.acquire()
 
36
 
 
37
    def run(self):
 
38
        self.server = HTTPServer(('', 0), self.request_handler)
 
39
        self.test_object.PORT = self.server.socket.getsockname()[1]
 
40
        self.test_object.lock.release()
 
41
        try:
 
42
            self.server.serve_forever()
 
43
        finally:
 
44
            self.server.server_close()
 
45
 
 
46
    def stop(self):
 
47
        self.server.shutdown()
 
48
 
 
49
 
 
50
class BaseTestCase(unittest.TestCase):
 
51
    def setUp(self):
 
52
        self.lock = threading.Lock()
 
53
        self.thread = TestServerThread(self, self.request_handler)
 
54
        self.thread.start()
 
55
        self.lock.acquire()
 
56
 
 
57
    def tearDown(self):
 
58
        self.lock.release()
 
59
        self.thread.stop()
 
60
 
 
61
    def request(self, uri, method='GET', body=None, headers={}):
 
62
        self.connection = httplib.HTTPConnection('localhost', self.PORT)
 
63
        self.connection.request(method, uri, body, headers)
 
64
        return self.connection.getresponse()
 
65
 
 
66
 
 
67
class BaseHTTPServerTestCase(BaseTestCase):
 
68
    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
 
69
        protocol_version = 'HTTP/1.1'
 
70
        default_request_version = 'HTTP/1.1'
 
71
 
 
72
        def do_TEST(self):
 
73
            self.send_response(204)
 
74
            self.send_header('Content-Type', 'text/html')
 
75
            self.send_header('Connection', 'close')
 
76
            self.end_headers()
 
77
 
 
78
        def do_KEEP(self):
 
79
            self.send_response(204)
 
80
            self.send_header('Content-Type', 'text/html')
 
81
            self.send_header('Connection', 'keep-alive')
 
82
            self.end_headers()
 
83
 
 
84
        def do_KEYERROR(self):
 
85
            self.send_error(999)
 
86
 
 
87
        def do_CUSTOM(self):
 
88
            self.send_response(999)
 
89
            self.send_header('Content-Type', 'text/html')
 
90
            self.send_header('Connection', 'close')
 
91
            self.end_headers()
 
92
 
 
93
    def setUp(self):
 
94
        BaseTestCase.setUp(self)
 
95
        self.con = httplib.HTTPConnection('localhost', self.PORT)
 
96
        self.con.connect()
 
97
 
 
98
    def test_command(self):
 
99
        self.con.request('GET', '/')
 
100
        res = self.con.getresponse()
 
101
        self.assertEquals(res.status, 501)
 
102
 
 
103
    def test_request_line_trimming(self):
 
104
        self.con._http_vsn_str = 'HTTP/1.1\n'
 
105
        self.con.putrequest('GET', '/')
 
106
        self.con.endheaders()
 
107
        res = self.con.getresponse()
 
108
        self.assertEquals(res.status, 501)
 
109
 
 
110
    def test_version_bogus(self):
 
111
        self.con._http_vsn_str = 'FUBAR'
 
112
        self.con.putrequest('GET', '/')
 
113
        self.con.endheaders()
 
114
        res = self.con.getresponse()
 
115
        self.assertEquals(res.status, 400)
 
116
 
 
117
    def test_version_digits(self):
 
118
        self.con._http_vsn_str = 'HTTP/9.9.9'
 
119
        self.con.putrequest('GET', '/')
 
120
        self.con.endheaders()
 
121
        res = self.con.getresponse()
 
122
        self.assertEquals(res.status, 400)
 
123
 
 
124
    def test_version_none_get(self):
 
125
        self.con._http_vsn_str = ''
 
126
        self.con.putrequest('GET', '/')
 
127
        self.con.endheaders()
 
128
        res = self.con.getresponse()
 
129
        self.assertEquals(res.status, 501)
 
130
 
 
131
    def test_version_none(self):
 
132
        self.con._http_vsn_str = ''
 
133
        self.con.putrequest('PUT', '/')
 
134
        self.con.endheaders()
 
135
        res = self.con.getresponse()
 
136
        self.assertEquals(res.status, 400)
 
137
 
 
138
    def test_version_invalid(self):
 
139
        self.con._http_vsn = 99
 
140
        self.con._http_vsn_str = 'HTTP/9.9'
 
141
        self.con.putrequest('GET', '/')
 
142
        self.con.endheaders()
 
143
        res = self.con.getresponse()
 
144
        self.assertEquals(res.status, 505)
 
145
 
 
146
    def test_send_blank(self):
 
147
        self.con._http_vsn_str = ''
 
148
        self.con.putrequest('', '')
 
149
        self.con.endheaders()
 
150
        res = self.con.getresponse()
 
151
        self.assertEquals(res.status, 400)
 
152
 
 
153
    def test_header_close(self):
 
154
        self.con.putrequest('GET', '/')
 
155
        self.con.putheader('Connection', 'close')
 
156
        self.con.endheaders()
 
157
        res = self.con.getresponse()
 
158
        self.assertEquals(res.status, 501)
 
159
 
 
160
    def test_head_keep_alive(self):
 
161
        self.con._http_vsn_str = 'HTTP/1.1'
 
162
        self.con.putrequest('GET', '/')
 
163
        self.con.putheader('Connection', 'keep-alive')
 
164
        self.con.endheaders()
 
165
        res = self.con.getresponse()
 
166
        self.assertEquals(res.status, 501)
 
167
 
 
168
    def test_handler(self):
 
169
        self.con.request('TEST', '/')
 
170
        res = self.con.getresponse()
 
171
        self.assertEquals(res.status, 204)
 
172
 
 
173
    def test_return_header_keep_alive(self):
 
174
        self.con.request('KEEP', '/')
 
175
        res = self.con.getresponse()
 
176
        self.assertEquals(res.getheader('Connection'), 'keep-alive')
 
177
        self.con.request('TEST', '/')
 
178
 
 
179
    def test_internal_key_error(self):
 
180
        self.con.request('KEYERROR', '/')
 
181
        res = self.con.getresponse()
 
182
        self.assertEquals(res.status, 999)
 
183
 
 
184
    def test_return_custom_status(self):
 
185
        self.con.request('CUSTOM', '/')
 
186
        res = self.con.getresponse()
 
187
        self.assertEquals(res.status, 999)
 
188
 
 
189
 
 
190
class SimpleHTTPServerTestCase(BaseTestCase):
 
191
    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
 
192
        pass
 
193
 
 
194
    def setUp(self):
 
195
        BaseTestCase.setUp(self)
 
196
        self.cwd = os.getcwd()
 
197
        basetempdir = tempfile.gettempdir()
 
198
        os.chdir(basetempdir)
 
199
        self.data = 'We are the knights who say Ni!'
 
200
        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
 
201
        self.tempdir_name = os.path.basename(self.tempdir)
 
202
        temp = open(os.path.join(self.tempdir, 'test'), 'wb')
 
203
        temp.write(self.data)
 
204
        temp.close()
 
205
 
 
206
    def tearDown(self):
 
207
        try:
 
208
            os.chdir(self.cwd)
 
209
            try:
 
210
                shutil.rmtree(self.tempdir)
 
211
            except:
 
212
                pass
 
213
        finally:
 
214
            BaseTestCase.tearDown(self)
 
215
 
 
216
    def check_status_and_reason(self, response, status, data=None):
 
217
        body = response.read()
 
218
        self.assert_(response)
 
219
        self.assertEquals(response.status, status)
 
220
        self.assert_(response.reason != None)
 
221
        if data:
 
222
            self.assertEqual(data, body)
 
223
 
 
224
    def test_get(self):
 
225
        #constructs the path relative to the root directory of the HTTPServer
 
226
        response = self.request(self.tempdir_name + '/test')
 
227
        self.check_status_and_reason(response, 200, data=self.data)
 
228
        response = self.request(self.tempdir_name + '/')
 
229
        self.check_status_and_reason(response, 200)
 
230
        response = self.request(self.tempdir_name)
 
231
        self.check_status_and_reason(response, 301)
 
232
        response = self.request('/ThisDoesNotExist')
 
233
        self.check_status_and_reason(response, 404)
 
234
        response = self.request('/' + 'ThisDoesNotExist' + '/')
 
235
        self.check_status_and_reason(response, 404)
 
236
        f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
 
237
        response = self.request('/' + self.tempdir_name + '/')
 
238
        self.check_status_and_reason(response, 200)
 
239
        if os.name == 'posix':
 
240
            # chmod won't work as expected on Windows platforms
 
241
            os.chmod(self.tempdir, 0)
 
242
            response = self.request(self.tempdir_name + '/')
 
243
            self.check_status_and_reason(response, 404)
 
244
            os.chmod(self.tempdir, 0755)
 
245
 
 
246
    def test_head(self):
 
247
        response = self.request(
 
248
            self.tempdir_name + '/test', method='HEAD')
 
249
        self.check_status_and_reason(response, 200)
 
250
        self.assertEqual(response.getheader('content-length'),
 
251
                         str(len(self.data)))
 
252
        self.assertEqual(response.getheader('content-type'),
 
253
                         'application/octet-stream')
 
254
 
 
255
    def test_invalid_requests(self):
 
256
        response = self.request('/', method='FOO')
 
257
        self.check_status_and_reason(response, 501)
 
258
        # requests must be case sensitive,so this should fail too
 
259
        response = self.request('/', method='get')
 
260
        self.check_status_and_reason(response, 501)
 
261
        response = self.request('/', method='GETs')
 
262
        self.check_status_and_reason(response, 501)
 
263
 
 
264
 
 
265
cgi_file1 = """\
 
266
#!%s
 
267
 
 
268
print "Content-type: text/html"
 
269
print
 
270
print "Hello World"
 
271
"""
 
272
 
 
273
cgi_file2 = """\
 
274
#!%s
 
275
import cgi
 
276
 
 
277
print "Content-type: text/html"
 
278
print
 
279
 
 
280
form = cgi.FieldStorage()
 
281
print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
 
282
              form.getfirst("bacon"))
 
283
"""
 
284
 
 
285
class CGIHTTPServerTestCase(BaseTestCase):
 
286
    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
 
287
        pass
 
288
 
 
289
    def setUp(self):
 
290
        BaseTestCase.setUp(self)
 
291
        self.parent_dir = tempfile.mkdtemp()
 
292
        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
 
293
        os.mkdir(self.cgi_dir)
 
294
 
 
295
        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
 
296
        with open(self.file1_path, 'w') as file1:
 
297
            file1.write(cgi_file1 % sys.executable)
 
298
        os.chmod(self.file1_path, 0777)
 
299
 
 
300
        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
 
301
        with open(self.file2_path, 'w') as file2:
 
302
            file2.write(cgi_file2 % sys.executable)
 
303
        os.chmod(self.file2_path, 0777)
 
304
 
 
305
        self.cwd = os.getcwd()
 
306
        os.chdir(self.parent_dir)
 
307
 
 
308
    def tearDown(self):
 
309
        try:
 
310
            os.chdir(self.cwd)
 
311
            os.remove(self.file1_path)
 
312
            os.remove(self.file2_path)
 
313
            os.rmdir(self.cgi_dir)
 
314
            os.rmdir(self.parent_dir)
 
315
        finally:
 
316
            BaseTestCase.tearDown(self)
 
317
 
 
318
    def test_headers_and_content(self):
 
319
        res = self.request('/cgi-bin/file1.py')
 
320
        self.assertEquals(('Hello World\n', 'text/html', 200), \
 
321
             (res.read(), res.getheader('Content-type'), res.status))
 
322
 
 
323
    def test_post(self):
 
324
        params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
 
325
        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
 
326
        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
 
327
 
 
328
        self.assertEquals(res.read(), '1, python, 123456\n')
 
329
 
 
330
    def test_invaliduri(self):
 
331
        res = self.request('/cgi-bin/invalid')
 
332
        res.read()
 
333
        self.assertEquals(res.status, 404)
 
334
 
 
335
    def test_authorization(self):
 
336
        headers = {'Authorization' : 'Basic %s' % \
 
337
                base64.b64encode('username:pass')}
 
338
        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
 
339
        self.assertEquals(('Hello World\n', 'text/html', 200), \
 
340
             (res.read(), res.getheader('Content-type'), res.status))
 
341
 
 
342
 
 
343
def test_main(verbose=None):
 
344
    try:
 
345
        cwd = os.getcwd()
 
346
        test_support.run_unittest(BaseHTTPServerTestCase,
 
347
                                  SimpleHTTPServerTestCase,
 
348
                                  CGIHTTPServerTestCase
 
349
                                  )
 
350
    finally:
 
351
        os.chdir(cwd)
 
352
 
 
353
if __name__ == '__main__':
 
354
    test_main()