1
# -*- coding: iso-8859-1 -*-
3
thfcgi.py - FastCGI communication with thread support
5
Copyright Peter ļæ½strand <astrand@lysator.liu.se> 2001
7
Modified for MoinMoin by Oliver Graf <ograf@bitart.de> 2003
9
Added "external application" support, refactored code
10
by Alexander Schremmer <alex AT alexanderweb DOT de>
12
Cleanup, fixed typos, PEP-8, support for limiting creation of threads,
13
limited number of requests lifetime, configurable backlog for socket
14
.listen() by MoinMoin:ThomasWaldmann.
16
2007 Support for Python's logging module by MoinMoin:ThomasWaldmann.
19
http://cvs.lysator.liu.se/viewcvs/viewcvs.cgi/webkom/thfcgi.py?cvsroot=webkom
21
This program is free software; you can redistribute it and/or modify
22
it under the terms of the GNU General Public License as published by
23
the Free Software Foundation; version 2 of the License.
25
This program is distributed in the hope that it will be useful,
26
but WITHOUT ANY WARRANTY; without even the implied warranty of
27
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28
GNU General Public License for more details.
30
You should have received a copy of the GNU General Public License
31
along with this program; if not, write to the Free Software
32
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
35
# TODO: Compare compare the number of bytes received on FCGI_STDIN with
36
# CONTENT_LENGTH and abort the update if the two numbers are not equal.
39
log = logging.getLogger(__name__)
47
from cStringIO import StringIO
51
import threading as _threading
53
import dummy_threading as _threading
55
# Maximum number of requests that can be handled
60
# Can this application multiplex connections?
64
FCGI_BEGIN_REQUEST = 1
65
FCGI_ABORT_REQUEST = 2
73
FCGI_GET_VALUES_RESULT = 10
74
FCGI_UNKNOWN_TYPE = 11
75
FCGI_MAXTYPE = FCGI_UNKNOWN_TYPE
77
# Types of management records
78
KNOWN_MANAGEMENT_TYPES = [FCGI_GET_VALUES]
80
FCGI_NULL_REQUEST_ID = 0
82
# Masks for flags component of FCGI_BEGIN_REQUEST
85
# Values for role component of FCGI_BEGIN_REQUEST
90
# Values for protocolStatus component of FCGI_END_REQUEST
91
FCGI_REQUEST_COMPLETE = 0 # Request completed ok
92
FCGI_CANT_MPX_CONN = 1 # This app cannot multiplex
93
FCGI_OVERLOADED = 2 # Too busy
94
FCGI_UNKNOWN_ROLE = 3 # Role value not known
97
FCGI_BeginRequestBody = "!HB5x"
98
FCGI_Record_header = "!BBHHBx"
99
FCGI_UnknownTypeBody = "!B7x"
100
FCGI_EndRequestBody = "!IB3x"
106
class SocketErrorOnWrite:
107
"""Is raised if a write fails in the socket code."""
111
"""Class representing FastCGI records"""
114
"""Initialize FastCGI record"""
115
self.version = FCGI_VERSION_1
116
self.rec_type = FCGI_UNKNOWN_TYPE
117
self.req_id = FCGI_NULL_REQUEST_ID
120
# Only in FCGI_BEGIN_REQUEST
125
# Only in FCGI_UNKNOWN_TYPE
126
self.unknownType = None
128
# Only in FCGI_END_REQUEST
129
self.appStatus = None
130
self.protocolStatus = None
132
def read_pair(self, data, pos):
133
"""Read a FastCGI key-value pair from the server."""
134
namelen = struct.unpack("!B", data[pos])[0]
137
namelen = struct.unpack("!I", data[pos:pos+4])[0] & 0x7fffffff
142
valuelen = struct.unpack("!B", data[pos])[0]
144
# 4-byte value length
145
valuelen = struct.unpack("!I", data[pos:pos+4])[0] & 0x7fffffff
150
name = data[pos:pos+namelen]
152
value = data[pos:pos+valuelen]
155
return name, value, pos
157
def write_pair(self, name, value):
158
"""Write a FastCGI key-value pair to the server."""
161
data = struct.pack("!B", namelen)
164
data = struct.pack("!I", namelen | 0x80000000L)
166
valuelen = len(value)
168
data += struct.pack("!B", value)
170
# 4-byte value length
171
data += struct.pack("!I", value | 0x80000000L)
173
return data + name + value
175
def readRecord(self, sock):
176
"""Read a FastCGI record from the server."""
179
# No data received. This means EOF.
182
self.version, self.rec_type, self.req_id, contentLength, paddingLength = \
183
struct.unpack(FCGI_Record_header, data)
186
while len(self.content) < contentLength:
187
data = sock.recv(contentLength - len(self.content))
188
self.content = self.content + data
189
if paddingLength != 0:
190
sock.recv(paddingLength)
192
# Parse the content information
193
if self.rec_type == FCGI_BEGIN_REQUEST:
194
self.role, self.flags = struct.unpack(FCGI_BeginRequestBody, self.content)
195
self.keep_conn = self.flags & FCGI_KEEP_CONN
197
elif self.rec_type == FCGI_UNKNOWN_TYPE:
198
self.unknownType = struct.unpack(FCGI_UnknownTypeBody, self.content)
200
elif self.rec_type == FCGI_GET_VALUES or self.rec_type == FCGI_PARAMS:
203
while pos < len(self.content):
204
name, value, pos = self.read_pair(self.content, pos)
205
self.values[name] = value
207
elif self.rec_type == FCGI_END_REQUEST:
208
self.appStatus, self.protocolStatus = struct.unpack(FCGI_EndRequestBody, self.content)
212
def writeRecord(self, sock):
213
"""Write a FastCGI record to the server."""
214
content = self.content
215
if self.rec_type == FCGI_BEGIN_REQUEST:
216
content = struct.pack(FCGI_BeginRequestBody, self.role, self.flags)
218
elif self.rec_type == FCGI_UNKNOWN_TYPE:
219
content = struct.pack(FCGI_UnknownTypeBody, self.unknownType)
221
elif self.rec_type == FCGI_GET_VALUES or self.rec_type == FCGI_PARAMS:
223
for i in self.values:
224
content = content + self.write_pair(i, self.values[i])
226
elif self.rec_type == FCGI_END_REQUEST:
227
content = struct.pack(FCGI_EndRequestBody, self.appStatus, self.protocolStatus)
229
# Align to 8-byte boundary
231
padlen = ((clen + 7) & 0xfff8) - clen
233
hdr = struct.pack(FCGI_Record_header, self.version, self.rec_type, self.req_id, clen, padlen)
236
sock.sendall(hdr + content + padlen*"\x00")
238
# Write error, probably broken pipe. Exit.
239
raise SocketErrorOnWrite
243
"""A request, corresponding to an accept():ed connection and
246
def __init__(self, conn, req_handler, inthread=False):
247
"""Initialize Request container."""
249
self.req_handler = req_handler
250
self.inthread = inthread
257
self.env_complete = 0
258
self.stdin = StringIO()
259
self.stdin_complete = 0
260
self.data = StringIO()
261
self.data_complete = 0
264
self.out = StringIO()
265
self.err = StringIO()
267
self.have_finished = 0
270
"""Read records for this request and handle them through the
274
if self.conn.fileno() < 1:
276
raise Exception("Connection lost")
280
select.select([self.conn], [], [])
282
if rec.readRecord(self.conn):
283
self._handle_record(rec)
285
# EOF, connection closed. Break loop, end thread.
288
def getFieldStorage(self):
289
"""Return a cgi FieldStorage constructed from the stdin and
290
environ read from the server for this request."""
292
# cgi.FieldStorage will eat the input here...
293
r = cgi.FieldStorage(fp=self.stdin, environ=self.env, keep_blank_values=1)
294
# hence, we reset here so we can obtain
299
def _flush(self, stream):
300
"""Flush a stream of this request."""
304
rec.rec_type = FCGI_STDOUT
305
rec.req_id = self.req_id
309
# Writing zero bytes would mean stream termination
313
chunk, data = self.getNextChunk(data)
315
rec.writeRecord(self.conn)
321
"""Flush Requests stdout stream."""
322
self._flush(self.out)
325
"""Flush Requests stderr stream."""
326
self._flush(self.err)
328
def finish(self, status=0):
329
"""Finish this Request, flushing all output and
330
possible exiting this thread."""
331
if self.have_finished:
334
self.have_finished = 1
337
if self.err.tell(): # just send err record if there is data on the err stream
340
rec.rec_type = FCGI_STDERR
341
rec.req_id = self.req_id
342
data = self.err.read()
344
chunk, data = self.getNextChunk(data)
346
rec.writeRecord(self.conn)
348
rec.writeRecord(self.conn) # Terminate stream
353
rec.rec_type = FCGI_STDOUT
354
rec.req_id = self.req_id
355
data = self.out.read()
357
chunk, data = self.getNextChunk(data)
359
rec.writeRecord(self.conn)
361
rec.writeRecord(self.conn) # Terminate stream
365
rec.rec_type = FCGI_END_REQUEST
366
rec.req_id = self.req_id
367
rec.appStatus = status
368
rec.protocolStatus = FCGI_REQUEST_COMPLETE
369
rec.writeRecord(self.conn)
370
if not self.keep_conn:
378
def _handle_record(self, rec):
380
if rec.req_id == FCGI_NULL_REQUEST_ID:
382
self._handle_man_record(rec)
385
self._handle_app_record(rec)
387
def _handle_man_record(self, rec):
388
"""Handle management record."""
389
rec_type = rec.rec_type
390
if rec_type in KNOWN_MANAGEMENT_TYPES:
391
self._handle_known_man_types(rec)
393
# It's a management record of an unknown type. Signal the error.
395
rec.rec_type = FCGI_UNKNOWN_TYPE
396
rec.unknownType = rec_type
397
rec.writeRecord(self.conn)
399
def _handle_known_man_types(self, rec):
400
"""Handle a known management record."""
401
if rec.rec_type == FCGI_GET_VALUES:
403
reply_rec.rec_type = FCGI_GET_VALUES_RESULT
405
params = {'FCGI_MAX_CONNS': FCGI_MAX_CONNS,
406
'FCGI_MAX_REQS': FCGI_MAX_REQS,
407
'FCGI_MPXS_CONNS': FCGI_MPXS_CONNS,
410
for name in rec.values:
412
# We known this value, include in reply
413
reply_rec.values[name] = params[name]
415
rec.writeRecord(self.conn)
417
def _handle_app_record(self, rec):
418
"""Handle an application record. This calls the specified
419
request_handler, if environ and stdin is complete."""
420
if rec.rec_type == FCGI_BEGIN_REQUEST:
422
self._handle_begin_request(rec)
424
elif rec.req_id != self.req_id:
425
log("Received unknown request ID %r" % rec.req_id)
426
# Ignore requests that aren't active
428
if rec.rec_type == FCGI_ABORT_REQUEST:
430
rec.rec_type = FCGI_END_REQUEST
431
rec.protocolStatus = FCGI_REQUEST_COMPLETE
433
rec.writeRecord(self.conn)
435
elif rec.rec_type == FCGI_PARAMS:
437
self._handle_params(rec)
438
elif rec.rec_type == FCGI_STDIN:
440
self._handle_stdin(rec)
441
elif rec.rec_type == FCGI_DATA:
443
self._handle_data(rec)
445
# Should never happen.
446
log("Received unknown FCGI record type %r" % rec.rec_type)
449
if self.env_complete and self.stdin_complete:
450
# Call application request handler.
451
# The arguments sent to the request handler is:
454
# env: The request environment
455
# form: FieldStorage.
456
self.req_handler(self, self.env, self.getFieldStorage())
458
def _handle_begin_request(self, rec):
459
"""Handle begin request."""
460
if rec.role != FCGI_RESPONDER:
461
# Unknown role, signal error.
462
rec.rec_type = FCGI_END_REQUEST
464
rec.protocolStatus = FCGI_UNKNOWN_ROLE
465
rec.writeRecord(self.conn)
468
self.req_id = rec.req_id
469
self.keep_conn = rec.keep_conn
471
def _handle_params(self, rec):
472
"""Handle environment."""
473
if self.env_complete:
475
log("Received FCGI_PARAMS more than once")
479
self.env_complete = 1
481
# Add all vars to our environment
482
self.env.update(rec.values)
484
def _handle_stdin(self, rec):
486
if self.stdin_complete:
488
log("Received FCGI_STDIN more than once")
492
self.stdin_complete = 1
496
self.stdin.write(rec.content)
498
def _handle_data(self, rec):
500
if self.data_complete:
502
log("Received FCGI_DATA more than once")
506
self.data_complete = 1
508
self.data.write(rec.content)
510
def getNextChunk(self, data):
511
"""Helper function which returns chunks of data."""
519
def __init__(self, req_handler, fd=sys.stdin, port=None, max_requests=-1, backlog=5, max_threads=5):
520
"""Initialize main loop and set request_handler."""
521
self.req_handler = req_handler
525
# how many requests we have left before terminating this process, -1 means infinite lifetime:
526
self.requests_left = max_requests
527
# for socket.listen(backlog):
528
self.backlog = backlog
529
# how many threads we have at maximum (including the main program = 1. thread)
530
self.max_threads = max_threads
532
def accept_handler(self, conn, addr, inthread=False):
533
"""Construct Request and run() it."""
534
self._check_good_addrs(addr)
536
req = Request(conn, self.req_handler, inthread)
538
except SocketErrorOnWrite:
541
def _make_socket(self):
542
"""Create socket and verify FCGI environment."""
545
if isinstance(self.__port, str):
547
os.unlink(self.__port)
550
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
552
# os.chmod(self.__port, 0660)
554
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
555
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
556
# bind to the localhost
557
s.bind(('127.0.0.1', self.__port))
560
if hasattr(socket, 'fromfd'):
561
s = socket.fromfd(self.fd.fileno(), socket.AF_INET, socket.SOCK_STREAM)
563
else: # we do not run on posix, fire up an FCGI external process
564
raise ValueError("FastCGI port is not setup correctly")
565
except socket.error, (err, errmsg):
566
if err != errno.ENOTCONN:
567
raise RuntimeError("No FastCGI environment: %s - %s" % (repr(err), errmsg))
571
def _check_good_addrs(self, addr):
572
"""Check if request is done from the right server."""
573
# Apaches mod_fastcgi seems not to use FCGI_WEB_SERVER_ADDRS.
574
if 'FCGI_WEB_SERVER_ADDRS' in os.environ:
575
good_addrs = os.environ['FCGI_WEB_SERVER_ADDRS'].split(',')
576
good_addrs = [addr.strip() for addr in good_addrs] # Remove whitespace
580
# Check if the connection is from a legal address
581
if good_addrs is not None and addr not in good_addrs:
582
raise RuntimeError("Connection from invalid server!")
585
"""Wait & serve. Calls request_handler on every request."""
586
self.sock.listen(self.backlog)
588
log("Starting Process (PID=%d)" % pid)
591
if not self.requests_left:
592
# self.sock.shutdown(RDWR) here does NOT help with backlog
593
log("Maximum number of processed requests reached, terminating this worker process (PID=%d)..." % pid)
595
elif self.requests_left > 0:
596
self.requests_left -= 1
598
conn, addr = self.sock.accept()
599
threadcount = _threading.activeCount()
600
if threadcount < self.max_threads:
601
log("Accepted connection, %d active threads, starting worker thread..." % threadcount)
602
t = _threading.Thread(target=self.accept_handler, args=(conn, addr, True))
605
log("Accepted connection, %d active threads, running in main thread..." % threadcount)
606
self.accept_handler(conn, addr, False)
608
log("Ending Process (PID=%d)" % pid)