~ubuntu-branches/ubuntu/natty/moin/natty-updates

« back to all changes in this revision

Viewing changes to MoinMoin/support/flup/server/preforkserver.py

  • Committer: Bazaar Package Importer
  • Author(s): Jonas Smedegaard
  • Date: 2008-06-22 21:17:13 UTC
  • mto: This revision was merged to the branch mainline in revision 18.
  • Revision ID: james.westby@ubuntu.com-20080622211713-inlv5k4eifxckelr
ImportĀ upstreamĀ versionĀ 1.7.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2005 Allan Saddi <allan@saddi.com>
2
 
# All rights reserved.
3
 
#
4
 
# Redistribution and use in source and binary forms, with or without
5
 
# modification, are permitted provided that the following conditions
6
 
# are met:
7
 
# 1. Redistributions of source code must retain the above copyright
8
 
#    notice, this list of conditions and the following disclaimer.
9
 
# 2. Redistributions in binary form must reproduce the above copyright
10
 
#    notice, this list of conditions and the following disclaimer in the
11
 
#    documentation and/or other materials provided with the distribution.
12
 
#
13
 
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
14
 
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15
 
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16
 
# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
17
 
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18
 
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19
 
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20
 
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
21
 
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
22
 
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
23
 
# SUCH DAMAGE.
24
 
#
25
 
# $Id$
26
 
 
27
 
__author__ = 'Allan Saddi <allan@saddi.com>'
28
 
__version__ = '$Revision$'
29
 
 
30
 
import sys
31
 
import os
32
 
import socket
33
 
import select
34
 
import errno
35
 
import signal
36
 
import random
37
 
import time
38
 
 
39
 
try:
40
 
    import fcntl
41
 
except ImportError:
42
 
    def setCloseOnExec(sock):
43
 
        pass
44
 
else:
45
 
    def setCloseOnExec(sock):
46
 
        fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
47
 
 
48
 
# If running Python < 2.4, require eunuchs module for socket.socketpair().
49
 
# See <http://www.inoi.fi/open/trac/eunuchs>.
50
 
if not hasattr(socket, 'socketpair'):
51
 
    try:
52
 
        import eunuchs.socketpair
53
 
    except ImportError:
54
 
        # TODO: Other alternatives? Perhaps using os.pipe()?
55
 
        raise ImportError, 'Requires eunuchs module for Python < 2.4'
56
 
 
57
 
    def socketpair():
58
 
        s1, s2 = eunuchs.socketpair.socketpair()
59
 
        p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
60
 
                socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
61
 
        os.close(s1)
62
 
        os.close(s2)
63
 
        return p, c
64
 
 
65
 
    socket.socketpair = socketpair
66
 
 
67
 
class PreforkServer(object):
68
 
    """
69
 
    A preforked server model conceptually similar to Apache httpd(2). At
70
 
    any given time, ensures there are at least minSpare children ready to
71
 
    process new requests (up to a maximum of maxChildren children total).
72
 
    If the number of idle children is ever above maxSpare, the extra
73
 
    children are killed.
74
 
 
75
 
    If maxRequests is positive, each child will only handle that many
76
 
    requests in its lifetime before exiting.
77
 
    
78
 
    jobClass should be a class whose constructor takes at least two
79
 
    arguments: the client socket and client address. jobArgs, which
80
 
    must be a list or tuple, is any additional (static) arguments you
81
 
    wish to pass to the constructor.
82
 
 
83
 
    jobClass should have a run() method (taking no arguments) that does
84
 
    the actual work. When run() returns, the request is considered
85
 
    complete and the child process moves to idle state.
86
 
    """
87
 
    def __init__(self, minSpare=1, maxSpare=5, maxChildren=50,
88
 
                 maxRequests=0, jobClass=None, jobArgs=()):
89
 
        self._minSpare = minSpare
90
 
        self._maxSpare = maxSpare
91
 
        self._maxChildren = max(maxSpare, maxChildren)
92
 
        self._maxRequests = maxRequests
93
 
        self._jobClass = jobClass
94
 
        self._jobArgs = jobArgs
95
 
 
96
 
        # Internal state of children. Maps pids to dictionaries with two
97
 
        # members: 'file' and 'avail'. 'file' is the socket to that
98
 
        # individidual child and 'avail' is whether or not the child is
99
 
        # free to process requests.
100
 
        self._children = {}
101
 
 
102
 
        self._children_to_purge = []
103
 
        self._last_purge = 0
104
 
 
105
 
        if minSpare < 1:
106
 
            raise ValueError("minSpare must be at least 1!")
107
 
        if maxSpare < minSpare:
108
 
            raise ValueError("maxSpare must be greater than, or equal to, minSpare!")
109
 
 
110
 
    def run(self, sock):
111
 
        """
112
 
        The main loop. Pass a socket that is ready to accept() client
113
 
        connections. Return value will be True or False indiciating whether
114
 
        or not the loop was exited due to SIGHUP.
115
 
        """
116
 
        # Set up signal handlers.
117
 
        self._keepGoing = True
118
 
        self._hupReceived = False
119
 
        self._installSignalHandlers()
120
 
 
121
 
        # Don't want operations on main socket to block.
122
 
        sock.setblocking(0)
123
 
 
124
 
        # Set close-on-exec
125
 
        setCloseOnExec(sock)
126
 
        
127
 
        # Main loop.
128
 
        while self._keepGoing:
129
 
            # Maintain minimum number of children. Note that we are checking
130
 
            # the absolute number of children, not the number of "available"
131
 
            # children. We explicitly test against _maxSpare to maintain
132
 
            # an *optimistic* absolute minimum. The number of children will
133
 
            # always be in the range [_maxSpare, _maxChildren].
134
 
            while len(self._children) < self._maxSpare:
135
 
                if not self._spawnChild(sock): break
136
 
 
137
 
            # Wait on any socket activity from live children.
138
 
            r = [x['file'] for x in self._children.values()
139
 
                 if x['file'] is not None]
140
 
 
141
 
            if len(r) == len(self._children) and not self._children_to_purge:
142
 
                timeout = None
143
 
            else:
144
 
                # There are dead children that need to be reaped, ensure
145
 
                # that they are by timing out, if necessary. Or there are some
146
 
                # children that need to die.
147
 
                timeout = 2
148
 
 
149
 
            w = []
150
 
            if (time.time() > self._last_purge + 10):
151
 
                w = [x for x in self._children_to_purge if x.fileno() != -1]
152
 
            try:
153
 
                r, w, e = select.select(r, w, [], timeout)
154
 
            except select.error, e:
155
 
                if e[0] != errno.EINTR:
156
 
                    raise
157
 
 
158
 
            # Scan child sockets and tend to those that need attention.
159
 
            for child in r:
160
 
                # Receive status byte.
161
 
                try:
162
 
                    state = child.recv(1)
163
 
                except socket.error, e:
164
 
                    if e[0] in (errno.EAGAIN, errno.EINTR):
165
 
                        # Guess it really didn't need attention?
166
 
                        continue
167
 
                    raise
168
 
                # Try to match it with a child. (Do we need a reverse map?)
169
 
                for pid,d in self._children.items():
170
 
                    if child is d['file']:
171
 
                        if state:
172
 
                            # Set availability status accordingly.
173
 
                            self._children[pid]['avail'] = state != '\x00'
174
 
                        else:
175
 
                            # Didn't receive anything. Child is most likely
176
 
                            # dead.
177
 
                            d = self._children[pid]
178
 
                            d['file'].close()
179
 
                            d['file'] = None
180
 
                            d['avail'] = False
181
 
 
182
 
            for child in w:
183
 
                # purging child
184
 
                child.send('bye, bye')
185
 
                del self._children_to_purge[self._children_to_purge.index(child)]
186
 
                self._last_purge = time.time()
187
 
 
188
 
                # Try to match it with a child. (Do we need a reverse map?)
189
 
                for pid,d in self._children.items():
190
 
                    if child is d['file']:
191
 
                        d['file'].close()
192
 
                        d['file'] = None
193
 
                        d['avail'] = False
194
 
                break
195
 
 
196
 
            # Reap children.
197
 
            self._reapChildren()
198
 
 
199
 
            # See who and how many children are available.
200
 
            availList = filter(lambda x: x[1]['avail'], self._children.items())
201
 
            avail = len(availList)
202
 
 
203
 
            if avail < self._minSpare:
204
 
                # Need to spawn more children.
205
 
                while avail < self._minSpare and \
206
 
                      len(self._children) < self._maxChildren:
207
 
                    if not self._spawnChild(sock): break
208
 
                    avail += 1
209
 
            elif avail > self._maxSpare:
210
 
                # Too many spares, kill off the extras.
211
 
                pids = [x[0] for x in availList]
212
 
                pids.sort()
213
 
                pids = pids[self._maxSpare:]
214
 
                for pid in pids:
215
 
                    d = self._children[pid]
216
 
                    d['file'].close()
217
 
                    d['file'] = None
218
 
                    d['avail'] = False
219
 
 
220
 
        # Clean up all child processes.
221
 
        self._cleanupChildren()
222
 
 
223
 
        # Restore signal handlers.
224
 
        self._restoreSignalHandlers()
225
 
 
226
 
        # Return bool based on whether or not SIGHUP was received.
227
 
        return self._hupReceived
228
 
 
229
 
    def _cleanupChildren(self):
230
 
        """
231
 
        Closes all child sockets (letting those that are available know
232
 
        that it's time to exit). Sends SIGINT to those that are currently
233
 
        processing (and hopes that it finishses ASAP).
234
 
 
235
 
        Any children remaining after 10 seconds is SIGKILLed.
236
 
        """
237
 
        # Let all children know it's time to go.
238
 
        for pid,d in self._children.items():
239
 
            if d['file'] is not None:
240
 
                d['file'].close()
241
 
                d['file'] = None
242
 
            if not d['avail']:
243
 
                # Child is unavailable. SIGINT it.
244
 
                try:
245
 
                    os.kill(pid, signal.SIGINT)
246
 
                except OSError, e:
247
 
                    if e[0] != errno.ESRCH:
248
 
                        raise
249
 
 
250
 
        def alrmHandler(signum, frame):
251
 
            pass
252
 
 
253
 
        # Set up alarm to wake us up after 10 seconds.
254
 
        oldSIGALRM = signal.getsignal(signal.SIGALRM)
255
 
        signal.signal(signal.SIGALRM, alrmHandler)
256
 
        signal.alarm(10)
257
 
 
258
 
        # Wait for all children to die.
259
 
        while len(self._children):
260
 
            try:
261
 
                pid, status = os.wait()
262
 
            except OSError, e:
263
 
                if e[0] in (errno.ECHILD, errno.EINTR):
264
 
                    break
265
 
            if self._children.has_key(pid):
266
 
                del self._children[pid]
267
 
 
268
 
        signal.alarm(0)
269
 
        signal.signal(signal.SIGALRM, oldSIGALRM)
270
 
 
271
 
        # Forcefully kill any remaining children.
272
 
        for pid in self._children.keys():
273
 
            try:
274
 
                os.kill(pid, signal.SIGKILL)
275
 
            except OSError, e:
276
 
                if e[0] != errno.ESRCH:
277
 
                    raise
278
 
 
279
 
    def _reapChildren(self):
280
 
        """Cleans up self._children whenever children die."""
281
 
        while True:
282
 
            try:
283
 
                pid, status = os.waitpid(-1, os.WNOHANG)
284
 
            except OSError, e:
285
 
                if e[0] == errno.ECHILD:
286
 
                    break
287
 
                raise
288
 
            if pid <= 0:
289
 
                break
290
 
            if self._children.has_key(pid): # Sanity check.
291
 
                if self._children[pid]['file'] is not None:
292
 
                    self._children[pid]['file'].close()
293
 
                    self._children[pid]['file'] = None
294
 
                del self._children[pid]
295
 
 
296
 
    def _spawnChild(self, sock):
297
 
        """
298
 
        Spawn a single child. Returns True if successful, False otherwise.
299
 
        """
300
 
        # This socket pair is used for very simple communication between
301
 
        # the parent and its children.
302
 
        parent, child = socket.socketpair()
303
 
        parent.setblocking(0)
304
 
        setCloseOnExec(parent)
305
 
        child.setblocking(0)
306
 
        setCloseOnExec(child)
307
 
        try:
308
 
            pid = os.fork()
309
 
        except OSError, e:
310
 
            if e[0] in (errno.EAGAIN, errno.ENOMEM):
311
 
                return False # Can't fork anymore.
312
 
            raise
313
 
        if not pid:
314
 
            # Child
315
 
            child.close()
316
 
            # Put child into its own process group.
317
 
            pid = os.getpid()
318
 
            os.setpgid(pid, pid)
319
 
            # Restore signal handlers.
320
 
            self._restoreSignalHandlers()
321
 
            # Close copies of child sockets.
322
 
            for f in [x['file'] for x in self._children.values()
323
 
                      if x['file'] is not None]:
324
 
                f.close()
325
 
            self._children = {}
326
 
            try:
327
 
                # Enter main loop.
328
 
                self._child(sock, parent)
329
 
            except KeyboardInterrupt:
330
 
                pass
331
 
            sys.exit(0)
332
 
        else:
333
 
            # Parent
334
 
            parent.close()
335
 
            d = self._children[pid] = {}
336
 
            d['file'] = child
337
 
            d['avail'] = True
338
 
            return True
339
 
 
340
 
    def _isClientAllowed(self, addr):
341
 
        """Override to provide access control."""
342
 
        return True
343
 
 
344
 
    def _notifyParent(self, parent, msg):
345
 
        """Send message to parent, ignoring EPIPE and retrying on EAGAIN"""
346
 
        while True:
347
 
            try:
348
 
                parent.send(msg)
349
 
                return True
350
 
            except socket.error, e:
351
 
                if e[0] == errno.EPIPE:
352
 
                    return False # Parent is gone
353
 
                if e[0] == errno.EAGAIN:
354
 
                    # Wait for socket change before sending again
355
 
                    select.select([], [parent], [])
356
 
                else:
357
 
                    raise
358
 
                
359
 
    def _child(self, sock, parent):
360
 
        """Main loop for children."""
361
 
        requestCount = 0
362
 
 
363
 
        # Re-seed random module
364
 
        preseed = ''
365
 
        # urandom only exists in Python >= 2.4
366
 
        if hasattr(os, 'urandom'):
367
 
            try:
368
 
                preseed = os.urandom(16)
369
 
            except NotImplementedError:
370
 
                pass
371
 
        # Have doubts about this. random.seed will just hash the string
372
 
        random.seed('%s%s%s' % (preseed, os.getpid(), time.time()))
373
 
        del preseed
374
 
 
375
 
        while True:
376
 
            # Wait for any activity on the main socket or parent socket.
377
 
            r, w, e = select.select([sock, parent], [], [])
378
 
 
379
 
            for f in r:
380
 
                # If there's any activity on the parent socket, it
381
 
                # means the parent wants us to die or has died itself.
382
 
                # Either way, exit.
383
 
                if f is parent:
384
 
                    return
385
 
 
386
 
            # Otherwise, there's activity on the main socket...
387
 
            try:
388
 
                clientSock, addr = sock.accept()
389
 
            except socket.error, e:
390
 
                if e[0] == errno.EAGAIN:
391
 
                    # Or maybe not.
392
 
                    continue
393
 
                raise
394
 
 
395
 
            setCloseOnExec(clientSock)
396
 
            
397
 
            # Check if this client is allowed.
398
 
            if not self._isClientAllowed(addr):
399
 
                clientSock.close()
400
 
                continue
401
 
 
402
 
            # Notify parent we're no longer available.
403
 
            self._notifyParent(parent, '\x00')
404
 
 
405
 
            # Do the job.
406
 
            self._jobClass(clientSock, addr, *self._jobArgs).run()
407
 
 
408
 
            # If we've serviced the maximum number of requests, exit.
409
 
            if self._maxRequests > 0:
410
 
                requestCount += 1
411
 
                if requestCount >= self._maxRequests:
412
 
                    break
413
 
                
414
 
            # Tell parent we're free again.
415
 
            if not self._notifyParent(parent, '\xff'):
416
 
                return # Parent is gone.
417
 
 
418
 
    # Signal handlers
419
 
 
420
 
    def _hupHandler(self, signum, frame):
421
 
        self._keepGoing = False
422
 
        self._hupReceived = True
423
 
 
424
 
    def _intHandler(self, signum, frame):
425
 
        self._keepGoing = False
426
 
 
427
 
    def _chldHandler(self, signum, frame):
428
 
        # Do nothing (breaks us out of select and allows us to reap children).
429
 
        pass
430
 
 
431
 
    def _usr1Handler(self, signum, frame):
432
 
        self._children_to_purge = [x['file'] for x in self._children.values()
433
 
                                   if x['file'] is not None]
434
 
 
435
 
    def _installSignalHandlers(self):
436
 
        supportedSignals = [signal.SIGINT, signal.SIGTERM]
437
 
        if hasattr(signal, 'SIGHUP'):
438
 
            supportedSignals.append(signal.SIGHUP)
439
 
        if hasattr(signal, 'SIGUSR1'):
440
 
            supportedSignals.append(signal.SIGUSR1)
441
 
 
442
 
        self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
443
 
 
444
 
        for sig in supportedSignals:
445
 
            if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
446
 
                signal.signal(sig, self._hupHandler)
447
 
            elif hasattr(signal, 'SIGUSR1') and sig == signal.SIGUSR1:
448
 
                signal.signal(sig, self._usr1Handler)
449
 
            else:
450
 
                signal.signal(sig, self._intHandler)
451
 
 
452
 
    def _restoreSignalHandlers(self):
453
 
        """Restores previous signal handlers."""
454
 
        for signum,handler in self._oldSIGs:
455
 
            signal.signal(signum, handler)
456
 
 
457
 
if __name__ == '__main__':
458
 
    class TestJob(object):
459
 
        def __init__(self, sock, addr):
460
 
            self._sock = sock
461
 
            self._addr = addr
462
 
        def run(self):
463
 
            print "Client connection opened from %s:%d" % self._addr
464
 
            self._sock.send('Hello World!\n')
465
 
            self._sock.setblocking(1)
466
 
            self._sock.recv(1)
467
 
            self._sock.close()
468
 
            print "Client connection closed from %s:%d" % self._addr
469
 
    sock = socket.socket()
470
 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
471
 
    sock.bind(('', 8080))
472
 
    sock.listen(socket.SOMAXCONN)
473
 
    PreforkServer(maxChildren=10, jobClass=TestJob).run(sock)