1
# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
7
# 1. Redistributions of source code must retain the above copyright
8
# notice, this list of conditions and the following disclaimer.
9
# 2. Redistributions in binary form must reproduce the above copyright
10
# notice, this list of conditions and the following disclaimer in the
11
# documentation and/or other materials provided with the distribution.
13
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
__author__ = 'Allan Saddi <allan@saddi.com>'
28
__version__ = '$Revision$'
42
def setCloseOnExec(sock):
45
def setCloseOnExec(sock):
46
fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
48
# If running Python < 2.4, require eunuchs module for socket.socketpair().
49
# See <http://www.inoi.fi/open/trac/eunuchs>.
50
if not hasattr(socket, 'socketpair'):
52
import eunuchs.socketpair
54
# TODO: Other alternatives? Perhaps using os.pipe()?
55
raise ImportError, 'Requires eunuchs module for Python < 2.4'
58
s1, s2 = eunuchs.socketpair.socketpair()
59
p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
60
socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
65
socket.socketpair = socketpair
67
class PreforkServer(object):
69
A preforked server model conceptually similar to Apache httpd(2). At
70
any given time, ensures there are at least minSpare children ready to
71
process new requests (up to a maximum of maxChildren children total).
72
If the number of idle children is ever above maxSpare, the extra
75
If maxRequests is positive, each child will only handle that many
76
requests in its lifetime before exiting.
78
jobClass should be a class whose constructor takes at least two
79
arguments: the client socket and client address. jobArgs, which
80
must be a list or tuple, is any additional (static) arguments you
81
wish to pass to the constructor.
83
jobClass should have a run() method (taking no arguments) that does
84
the actual work. When run() returns, the request is considered
85
complete and the child process moves to idle state.
87
def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
88
maxRequests=0, jobClass=None, jobArgs=()):
89
self._minSpare = minSpare
90
self._maxSpare = maxSpare
91
self._maxChildren = max(maxSpare, maxChildren)
92
self._maxRequests = maxRequests
93
self._jobClass = jobClass
94
self._jobArgs = jobArgs
96
# Internal state of children. Maps pids to dictionaries with two
97
# members: 'file' and 'avail'. 'file' is the socket to that
98
# individidual child and 'avail' is whether or not the child is
99
# free to process requests.
102
self._children_to_purge = []
106
raise ValueError("minSpare must be at least 1!")
107
if maxSpare < minSpare:
108
raise ValueError("maxSpare must be greater than, or equal to, minSpare!")
112
The main loop. Pass a socket that is ready to accept() client
113
connections. Return value will be True or False indiciating whether
114
or not the loop was exited due to SIGHUP.
116
# Set up signal handlers.
117
self._keepGoing = True
118
self._hupReceived = False
119
self._installSignalHandlers()
121
# Don't want operations on main socket to block.
128
while self._keepGoing:
129
# Maintain minimum number of children. Note that we are checking
130
# the absolute number of children, not the number of "available"
131
# children. We explicitly test against _maxSpare to maintain
132
# an *optimistic* absolute minimum. The number of children will
133
# always be in the range [_maxSpare, _maxChildren].
134
while len(self._children) < self._maxSpare:
135
if not self._spawnChild(sock): break
137
# Wait on any socket activity from live children.
138
r = [x['file'] for x in self._children.values()
139
if x['file'] is not None]
141
if len(r) == len(self._children) and not self._children_to_purge:
144
# There are dead children that need to be reaped, ensure
145
# that they are by timing out, if necessary. Or there are some
146
# children that need to die.
150
if (time.time() > self._last_purge + 10):
151
w = [x for x in self._children_to_purge if x.fileno() != -1]
153
r, w, e = select.select(r, w, [], timeout)
154
except select.error, e:
155
if e[0] != errno.EINTR:
158
# Scan child sockets and tend to those that need attention.
160
# Receive status byte.
162
state = child.recv(1)
163
except socket.error, e:
164
if e[0] in (errno.EAGAIN, errno.EINTR):
165
# Guess it really didn't need attention?
168
# Try to match it with a child. (Do we need a reverse map?)
169
for pid,d in self._children.items():
170
if child is d['file']:
172
# Set availability status accordingly.
173
self._children[pid]['avail'] = state != '\x00'
175
# Didn't receive anything. Child is most likely
177
d = self._children[pid]
184
child.send('bye, bye')
185
del self._children_to_purge[self._children_to_purge.index(child)]
186
self._last_purge = time.time()
188
# Try to match it with a child. (Do we need a reverse map?)
189
for pid,d in self._children.items():
190
if child is d['file']:
199
# See who and how many children are available.
200
availList = filter(lambda x: x[1]['avail'], self._children.items())
201
avail = len(availList)
203
if avail < self._minSpare:
204
# Need to spawn more children.
205
while avail < self._minSpare and \
206
len(self._children) < self._maxChildren:
207
if not self._spawnChild(sock): break
209
elif avail > self._maxSpare:
210
# Too many spares, kill off the extras.
211
pids = [x[0] for x in availList]
213
pids = pids[self._maxSpare:]
215
d = self._children[pid]
220
# Clean up all child processes.
221
self._cleanupChildren()
223
# Restore signal handlers.
224
self._restoreSignalHandlers()
226
# Return bool based on whether or not SIGHUP was received.
227
return self._hupReceived
229
def _cleanupChildren(self):
231
Closes all child sockets (letting those that are available know
232
that it's time to exit). Sends SIGINT to those that are currently
233
processing (and hopes that it finishses ASAP).
235
Any children remaining after 10 seconds is SIGKILLed.
237
# Let all children know it's time to go.
238
for pid,d in self._children.items():
239
if d['file'] is not None:
243
# Child is unavailable. SIGINT it.
245
os.kill(pid, signal.SIGINT)
247
if e[0] != errno.ESRCH:
250
def alrmHandler(signum, frame):
253
# Set up alarm to wake us up after 10 seconds.
254
oldSIGALRM = signal.getsignal(signal.SIGALRM)
255
signal.signal(signal.SIGALRM, alrmHandler)
258
# Wait for all children to die.
259
while len(self._children):
261
pid, status = os.wait()
263
if e[0] in (errno.ECHILD, errno.EINTR):
265
if self._children.has_key(pid):
266
del self._children[pid]
269
signal.signal(signal.SIGALRM, oldSIGALRM)
271
# Forcefully kill any remaining children.
272
for pid in self._children.keys():
274
os.kill(pid, signal.SIGKILL)
276
if e[0] != errno.ESRCH:
279
def _reapChildren(self):
280
"""Cleans up self._children whenever children die."""
283
pid, status = os.waitpid(-1, os.WNOHANG)
285
if e[0] == errno.ECHILD:
290
if self._children.has_key(pid): # Sanity check.
291
if self._children[pid]['file'] is not None:
292
self._children[pid]['file'].close()
293
self._children[pid]['file'] = None
294
del self._children[pid]
296
def _spawnChild(self, sock):
298
Spawn a single child. Returns True if successful, False otherwise.
300
# This socket pair is used for very simple communication between
301
# the parent and its children.
302
parent, child = socket.socketpair()
303
parent.setblocking(0)
304
setCloseOnExec(parent)
306
setCloseOnExec(child)
310
if e[0] in (errno.EAGAIN, errno.ENOMEM):
311
return False # Can't fork anymore.
316
# Put child into its own process group.
319
# Restore signal handlers.
320
self._restoreSignalHandlers()
321
# Close copies of child sockets.
322
for f in [x['file'] for x in self._children.values()
323
if x['file'] is not None]:
328
self._child(sock, parent)
329
except KeyboardInterrupt:
335
d = self._children[pid] = {}
340
def _isClientAllowed(self, addr):
341
"""Override to provide access control."""
344
def _notifyParent(self, parent, msg):
345
"""Send message to parent, ignoring EPIPE and retrying on EAGAIN"""
350
except socket.error, e:
351
if e[0] == errno.EPIPE:
352
return False # Parent is gone
353
if e[0] == errno.EAGAIN:
354
# Wait for socket change before sending again
355
select.select([], [parent], [])
359
def _child(self, sock, parent):
360
"""Main loop for children."""
363
# Re-seed random module
365
# urandom only exists in Python >= 2.4
366
if hasattr(os, 'urandom'):
368
preseed = os.urandom(16)
369
except NotImplementedError:
371
# Have doubts about this. random.seed will just hash the string
372
random.seed('%s%s%s' % (preseed, os.getpid(), time.time()))
376
# Wait for any activity on the main socket or parent socket.
377
r, w, e = select.select([sock, parent], [], [])
380
# If there's any activity on the parent socket, it
381
# means the parent wants us to die or has died itself.
386
# Otherwise, there's activity on the main socket...
388
clientSock, addr = sock.accept()
389
except socket.error, e:
390
if e[0] == errno.EAGAIN:
395
setCloseOnExec(clientSock)
397
# Check if this client is allowed.
398
if not self._isClientAllowed(addr):
402
# Notify parent we're no longer available.
403
self._notifyParent(parent, '\x00')
406
self._jobClass(clientSock, addr, *self._jobArgs).run()
408
# If we've serviced the maximum number of requests, exit.
409
if self._maxRequests > 0:
411
if requestCount >= self._maxRequests:
414
# Tell parent we're free again.
415
if not self._notifyParent(parent, '\xff'):
416
return # Parent is gone.
420
def _hupHandler(self, signum, frame):
421
self._keepGoing = False
422
self._hupReceived = True
424
def _intHandler(self, signum, frame):
425
self._keepGoing = False
427
def _chldHandler(self, signum, frame):
428
# Do nothing (breaks us out of select and allows us to reap children).
431
def _usr1Handler(self, signum, frame):
432
self._children_to_purge = [x['file'] for x in self._children.values()
433
if x['file'] is not None]
435
def _installSignalHandlers(self):
436
supportedSignals = [signal.SIGINT, signal.SIGTERM]
437
if hasattr(signal, 'SIGHUP'):
438
supportedSignals.append(signal.SIGHUP)
439
if hasattr(signal, 'SIGUSR1'):
440
supportedSignals.append(signal.SIGUSR1)
442
self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
444
for sig in supportedSignals:
445
if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
446
signal.signal(sig, self._hupHandler)
447
elif hasattr(signal, 'SIGUSR1') and sig == signal.SIGUSR1:
448
signal.signal(sig, self._usr1Handler)
450
signal.signal(sig, self._intHandler)
452
def _restoreSignalHandlers(self):
453
"""Restores previous signal handlers."""
454
for signum,handler in self._oldSIGs:
455
signal.signal(signum, handler)
457
if __name__ == '__main__':
458
class TestJob(object):
459
def __init__(self, sock, addr):
463
print "Client connection opened from %s:%d" % self._addr
464
self._sock.send('Hello World!\n')
465
self._sock.setblocking(1)
468
print "Client connection closed from %s:%d" % self._addr
469
sock = socket.socket()
470
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
471
sock.bind(('', 8080))
472
sock.listen(socket.SOMAXCONN)
473
PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)