1
"""CGI-savvy HTTP Server.
3
This module builds on SimpleHTTPServer by implementing GET and POST
4
requests to cgi-bin scripts.
6
If the os.fork() function is not present (e.g. on Windows),
7
os.popen2() is used as a fallback, with slightly altered semantics; if
8
that function is not present either (e.g. on Macintosh), only Python
9
scripts are supported, and they are executed by the current process.
11
In all cases, the implementation is intentionally naive -- all
12
requests are executed sychronously.
14
SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
15
-- it may execute arbitrary Python code or external programs.
17
Note that status code 200 is sent prior to execution of a CGI script, so
18
scripts cannot send other status codes such as 302 (redirect).
24
__all__ = ["CGIHTTPRequestHandler"]
30
import SimpleHTTPServer
34
class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
36
"""Complete HTTP server with GET, HEAD and POST commands.
38
GET and HEAD also support running CGI scripts.
40
The POST command is *only* implemented for CGI scripts.
44
# Determine platform specifics
45
have_fork = hasattr(os, 'fork')
46
have_popen2 = hasattr(os, 'popen2')
47
have_popen3 = hasattr(os, 'popen3')
49
# Make rfile unbuffered -- we need to read one line and then pass
50
# the rest to a subprocess, so we can't use buffered input.
54
"""Serve a POST request.
56
This is only implemented for CGI scripts.
63
self.send_error(501, "Can only POST to CGI scripts")
66
"""Version of send_head that support CGI scripts"""
70
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
73
"""Test whether self.path corresponds to a CGI script,
76
This function sets self.cgi_info to a tuple (dir, rest)
77
when it returns True, where dir is the directory part before
78
the CGI script name. Note that rest begins with a
79
slash if it is not empty.
81
The default implementation tests whether the path
82
begins with one of the strings in the list
83
self.cgi_directories (and the next character is a '/'
84
or the end of the string).
89
for x in self.cgi_directories:
91
if path[:i] == x and (not path[i:] or path[i] == '/'):
92
self.cgi_info = path[:i], path[i+1:]
96
cgi_directories = ['/cgi-bin', '/htbin']
98
def is_executable(self, path):
99
"""Test whether argument path is an executable file."""
100
return executable(path)
102
def is_python(self, path):
103
"""Test whether argument path is a Python script."""
104
head, tail = os.path.splitext(path)
105
return tail.lower() in (".py", ".pyw")
108
"""Execute a CGI script."""
110
dir, rest = self.cgi_info
112
i = path.find('/', len(dir) + 1)
115
nextrest = path[i+1:]
117
scriptdir = self.translate_path(nextdir)
118
if os.path.isdir(scriptdir):
119
dir, rest = nextdir, nextrest
120
i = path.find('/', len(dir) + 1)
124
# find an explicit query string, if present.
127
rest, query = rest[:i], rest[i+1:]
131
# dissect the part after the directory name into a script name &
132
# a possible additional path, to be stored in PATH_INFO.
135
script, rest = rest[:i], rest[i:]
137
script, rest = rest, ''
139
scriptname = dir + '/' + script
140
scriptfile = self.translate_path(scriptname)
141
if not os.path.exists(scriptfile):
142
self.send_error(404, "No such CGI script (%r)" % scriptname)
144
if not os.path.isfile(scriptfile):
145
self.send_error(403, "CGI script is not a plain file (%r)" %
148
ispy = self.is_python(scriptname)
150
if not (self.have_fork or self.have_popen2 or self.have_popen3):
151
self.send_error(403, "CGI script is not a Python script (%r)" %
154
if not self.is_executable(scriptfile):
155
self.send_error(403, "CGI script is not executable (%r)" %
159
# Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
160
# XXX Much of the following could be prepared ahead of time!
162
env['SERVER_SOFTWARE'] = self.version_string()
163
env['SERVER_NAME'] = self.server.server_name
164
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
165
env['SERVER_PROTOCOL'] = self.protocol_version
166
env['SERVER_PORT'] = str(self.server.server_port)
167
env['REQUEST_METHOD'] = self.command
168
uqrest = urllib.unquote(rest)
169
env['PATH_INFO'] = uqrest
170
env['PATH_TRANSLATED'] = self.translate_path(uqrest)
171
env['SCRIPT_NAME'] = scriptname
173
env['QUERY_STRING'] = query
174
host = self.address_string()
175
if host != self.client_address[0]:
176
env['REMOTE_HOST'] = host
177
env['REMOTE_ADDR'] = self.client_address[0]
178
authorization = self.headers.getheader("authorization")
180
authorization = authorization.split()
181
if len(authorization) == 2:
182
import base64, binascii
183
env['AUTH_TYPE'] = authorization[0]
184
if authorization[0].lower() == "basic":
186
authorization = base64.decodestring(authorization[1])
187
except binascii.Error:
190
authorization = authorization.split(':')
191
if len(authorization) == 2:
192
env['REMOTE_USER'] = authorization[0]
194
if self.headers.typeheader is None:
195
env['CONTENT_TYPE'] = self.headers.type
197
env['CONTENT_TYPE'] = self.headers.typeheader
198
length = self.headers.getheader('content-length')
200
env['CONTENT_LENGTH'] = length
201
referer = self.headers.getheader('referer')
203
env['HTTP_REFERER'] = referer
205
for line in self.headers.getallmatchingheaders('accept'):
206
if line[:1] in "\t\n\r ":
207
accept.append(line.strip())
209
accept = accept + line[7:].split(',')
210
env['HTTP_ACCEPT'] = ','.join(accept)
211
ua = self.headers.getheader('user-agent')
213
env['HTTP_USER_AGENT'] = ua
214
co = filter(None, self.headers.getheaders('cookie'))
216
env['HTTP_COOKIE'] = ', '.join(co)
217
# XXX Other HTTP_* headers
218
# Since we're setting the env in the parent, provide empty
219
# values to override previously set values
220
for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
221
'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
222
env.setdefault(k, "")
223
os.environ.update(env)
225
self.send_response(200, "Script output follows")
227
decoded_query = query.replace('+', ' ')
230
# Unix -- fork as we should
232
if '=' not in decoded_query:
233
args.append(decoded_query)
234
nobody = nobody_uid()
235
self.wfile.flush() # Always flush before forking
239
pid, sts = os.waitpid(pid, 0)
240
# throw away additional data [see bug #427345]
241
while select.select([self.rfile], [], [], 0)[0]:
242
if not self.rfile.read(1):
245
self.log_error("CGI script exit status %#x", sts)
253
os.dup2(self.rfile.fileno(), 0)
254
os.dup2(self.wfile.fileno(), 1)
255
os.execve(scriptfile, args, os.environ)
257
self.server.handle_error(self.request, self.client_address)
260
elif self.have_popen2 or self.have_popen3:
261
# Windows -- use popen2 or popen3 to create a subprocess
268
if self.is_python(scriptfile):
269
interp = sys.executable
270
if interp.lower().endswith("w.exe"):
271
# On Windows, use python.exe, not pythonw.exe
272
interp = interp[:-5] + interp[-4:]
273
cmdline = "%s -u %s" % (interp, cmdline)
274
if '=' not in query and '"' not in query:
275
cmdline = '%s "%s"' % (cmdline, query)
276
self.log_message("command: %s", cmdline)
279
except (TypeError, ValueError):
281
files = popenx(cmdline, 'b')
286
if self.command.lower() == "post" and nbytes > 0:
287
data = self.rfile.read(nbytes)
289
# throw away additional data [see bug #427345]
290
while select.select([self.rfile._sock], [], [], 0)[0]:
291
if not self.rfile._sock.recv(1):
294
shutil.copyfileobj(fo, self.wfile)
299
self.log_error('%s', errors)
302
self.log_error("CGI script exit status %#x", sts)
304
self.log_message("CGI script exited OK")
307
# Other O.S. -- execute script in this process
309
save_stdin = sys.stdin
310
save_stdout = sys.stdout
311
save_stderr = sys.stderr
313
save_cwd = os.getcwd()
315
sys.argv = [scriptfile]
316
if '=' not in decoded_query:
317
sys.argv.append(decoded_query)
318
sys.stdout = self.wfile
319
sys.stdin = self.rfile
320
execfile(scriptfile, {"__name__": "__main__"})
323
sys.stdin = save_stdin
324
sys.stdout = save_stdout
325
sys.stderr = save_stderr
327
except SystemExit, sts:
328
self.log_error("CGI script exit status %s", str(sts))
330
self.log_message("CGI script exited OK")
336
"""Internal routine to get nobody's uid"""
345
nobody = pwd.getpwnam('nobody')[2]
347
nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
351
def executable(path):
352
"""Test for executable file."""
357
return st.st_mode & 0111 != 0
360
def test(HandlerClass = CGIHTTPRequestHandler,
361
ServerClass = BaseHTTPServer.HTTPServer):
362
SimpleHTTPServer.test(HandlerClass, ServerClass)
365
if __name__ == '__main__':