~ubuntu-branches/debian/sid/pyro/sid

« back to all changes in this revision

Viewing changes to src/Pyro/socketserver/threadpoolserver.py

  • Committer: Bazaar Package Importer
  • Author(s): Carl Chenet, Carl Chenet, Jakub Wilk
  • Date: 2010-09-14 01:04:28 UTC
  • Revision ID: james.westby@ubuntu.com-20100914010428-02r7p1rzr7jvw94z
Tags: 1:3.9.1-2
[Carl Chenet]
* revert to 3.9.1-1 package because of the development status 
  of the 4.1 package is unsuitable for stable use
  DPMT svn #8557 revision (Closes: #589172) 
* added debian/source
* added debian/source/format
* package is now 3.0 (quilt) source format
* debian/control
  - Bump Standards-Version to 3.9.1

[Jakub Wilk]
* Add ‘XS-Python-Version: >= 2.5’ to prevent bytecompilation with python2.4
  (closes: #589053).

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""
2
 
Socket server based on a worker thread pool. Doesn't use select.
3
 
 
4
 
Uses a single worker thread per client connection.
5
 
 
6
 
Pyro - Python Remote Objects.  Copyright by Irmen de Jong.
7
 
irmen@razorvine.net - http://www.razorvine.net/python/Pyro
8
 
"""
9
 
 
10
 
from __future__ import with_statement
11
 
import socket, logging, Queue
12
 
import time, os
13
 
from Pyro.socketutil import SocketConnection, createSocket
14
 
from Pyro.errors import ConnectionClosedError, PyroError
15
 
import Pyro.config
16
 
from Pyro import threadutil
17
 
 
18
 
log=logging.getLogger("Pyro.socketserver.threadpool")
19
 
 
20
 
class SocketWorker(threadutil.Thread):
21
 
    """worker thread to process requests"""
22
 
    def __init__(self, server, callback):
23
 
        super(SocketWorker,self).__init__()
24
 
        self.setDaemon(True)
25
 
        self.server=server
26
 
        self.callback=callback
27
 
        if os.name=="java":
28
 
            # jython names every thread 'Thread', so we improve that a little
29
 
            self.setName("Thread-%d"%id(self))
30
 
    def run(self):
31
 
        self.running=True
32
 
        try:
33
 
            log.debug("new worker %s", self.getName())
34
 
            while self.running: # loop over all connections in the queue
35
 
                self.csock,self.caddr = self.server.workqueue.get()
36
 
                if self.csock is None and self.caddr is None:
37
 
                    # this was a 'stop' sentinel
38
 
                    self.running=False
39
 
                    break
40
 
                log.debug("worker %s got a client connection %s", self.getName(), self.caddr)
41
 
                self.csock=SocketConnection(self.csock)
42
 
                if self.handleConnection(self.csock):
43
 
                    self.server.threadpool.updateWorking(1)  # tell the pool we're working
44
 
                    try:
45
 
                        while self.running:   # loop over all requests during a single connection
46
 
                            try:
47
 
                                self.callback.handleRequest(self.csock)
48
 
                            except (socket.error,ConnectionClosedError):
49
 
                                # client went away.
50
 
                                log.debug("worker %s client disconnected %s", self.getName(), self.caddr)
51
 
                                break
52
 
                        self.csock.close()
53
 
                    finally:
54
 
                        # make sure we tell the pool that we are no longer working
55
 
                        self.server.threadpool.updateWorking(-1)
56
 
        # Note: we don't swallow exceptions here anymore because @Pyro.callback doesn't
57
 
        #       do anything anymore if we do (the re-raised exception would be swallowed...)
58
 
        #except Exception:
59
 
        #    exc_type, exc_value, _ = sys.exc_info()
60
 
        #    log.warn("swallow exception in worker %s: %s %s",self.getName(),exc_type,exc_value)
61
 
        finally:
62
 
            self.server.threadpool.remove(self)
63
 
            log.debug("stopping worker %s", self.getName())
64
 
    def handleConnection(self,conn):
65
 
        try:
66
 
            if self.callback.handshake(conn):
67
 
                return True
68
 
        except (socket.error, PyroError), x:
69
 
            log.warn("error during connect: %s",x)
70
 
            conn.close()
71
 
        return False
72
 
                    
73
 
class ThreadPool(set):
74
 
    lock=threadutil.Lock()
75
 
    def __init__(self, server, callback):
76
 
        self.__server=server
77
 
        self.__callback=callback
78
 
        self.__working=0
79
 
        self.__lastshrink=time.time()
80
 
    def attemptRemove(self, member):
81
 
        with self.lock:
82
 
            if len(self)>Pyro.config.THREADPOOL_MINTHREADS:
83
 
                super(ThreadPool,self).remove(member)
84
 
                return True
85
 
            return False
86
 
    def remove(self, member):
87
 
        with self.lock:
88
 
            try:
89
 
                super(ThreadPool,self).remove(member)
90
 
            except KeyError:
91
 
                pass
92
 
    def attemptSpawn(self):
93
 
        with self.lock:
94
 
            if len(self)<Pyro.config.THREADPOOL_MAXTHREADS:
95
 
                worker=SocketWorker(self.__server, self.__callback)
96
 
                self.add(worker)
97
 
                worker.start()
98
 
                return True
99
 
            return False
100
 
    def poolCritical(self):
101
 
        idle=len(self)-self.__working
102
 
        return idle<=0
103
 
    def updateWorking(self, number):
104
 
        self.shrink()
105
 
        with self.lock:
106
 
            self.__working+=number
107
 
    def shrink(self):
108
 
        threads=len(self)
109
 
        if threads>Pyro.config.THREADPOOL_MINTHREADS:
110
 
            idle=threads-self.__working
111
 
            if idle>Pyro.config.THREADPOOL_MINTHREADS and (time.time()-self.__lastshrink)>Pyro.config.THREADPOOL_IDLETIMEOUT:
112
 
                for _ in range(idle-Pyro.config.THREADPOOL_MINTHREADS):
113
 
                    self.__server.workqueue.put((None,None)) # put a 'stop' sentinel in the worker queue to kill a worker
114
 
                self.__lastshrink=time.time()
115
 
                    
116
 
class SocketServer_Threadpool(object):
117
 
    """transport server for socket connections, worker thread pool version."""
118
 
    def __init__(self, callbackObject, host, port, timeout=None):
119
 
        log.info("starting thread pool socketserver")
120
 
        self.sock=None
121
 
        self.sock=createSocket(bind=(host,port), timeout=timeout)
122
 
        self._socketaddr=self.sock.getsockname()
123
 
        if self._socketaddr[0].startswith("127."):
124
 
            if host is None or host.lower()!="localhost" and not host.startswith("127."):
125
 
                log.warn("weird DNS setup: %s resolves to localhost (127.x.x.x)",host)
126
 
        host=host or self._socketaddr[0]
127
 
        port=port or self._socketaddr[1]
128
 
        self.locationStr="%s:%d" % (host,port)
129
 
        self.threadpool=ThreadPool(self, callbackObject)
130
 
        self.workqueue=Queue.Queue()
131
 
        for _ in range(Pyro.config.THREADPOOL_MINTHREADS):
132
 
            self.threadpool.attemptSpawn()
133
 
        log.info("%d worker threads started", len(self.threadpool))
134
 
    def __del__(self):
135
 
        if self.sock is not None:
136
 
            self.sock.close()
137
 
 
138
 
    def requestLoop(self, loopCondition=lambda:True):
139
 
        log.debug("threadpool server requestloop")
140
 
        while (self.sock is not None) and loopCondition():
141
 
            try:
142
 
                self.handleRequests(None)
143
 
            except socket.error:
144
 
                if not loopCondition():
145
 
                    # swallow the socket error if loop terminates anyway
146
 
                    # this can occur if we are asked to shutdown, socket can be invalid then
147
 
                    break
148
 
                else:
149
 
                    raise
150
 
            except KeyboardInterrupt:
151
 
                log.debug("stopping on break signal")
152
 
                break
153
 
        log.debug("threadpool server exits requestloop")
154
 
    def handleRequests(self, eventsockets):
155
 
        try:
156
 
            # we only react on events on our own server socket.
157
 
            # all other (client) sockets are owned by their individual threads.
158
 
            csock, caddr=self.sock.accept()
159
 
            log.debug("connection from %s",caddr)
160
 
            if Pyro.config.COMMTIMEOUT:
161
 
                csock.settimeout(Pyro.config.COMMTIMEOUT)
162
 
            if self.threadpool.poolCritical():
163
 
                self.threadpool.attemptSpawn()
164
 
            self.workqueue.put((csock,caddr))
165
 
        except socket.timeout:
166
 
            pass  # just continue the loop on a timeout on accept
167
 
 
168
 
    def close(self, joinWorkers=True): 
169
 
        log.debug("closing threadpool server")
170
 
        if self.sock:
171
 
            try:
172
 
                self.sock.close()
173
 
            except Exception:
174
 
                pass
175
 
            self.sock=None
176
 
        for worker in self.threadpool.copy():
177
 
            worker.running=False
178
 
            csock=getattr(worker,"csock",None)
179
 
            if csock:
180
 
                csock.close()    # terminate socket that the worker might be listening on
181
 
            if self.workqueue is not None:
182
 
                self.workqueue.put((None,None)) # put a 'stop' sentinel in the worker queue
183
 
        while joinWorkers:
184
 
            try:
185
 
                worker=self.threadpool.pop()
186
 
            except KeyError:
187
 
                break
188
 
            else:
189
 
                worker.join()
190
 
 
191
 
    def fileno(self):
192
 
        return self.sock.fileno()
193
 
    def sockets(self):
194
 
        # the server socket is all we care about, all client sockets are running in their own threads 
195
 
        return [self.sock]
196
 
 
197
 
    def pingConnection(self):
198
 
        """bit of a hack to trigger a blocking server to get out of the loop, useful at clean shutdowns"""
199
 
        try:
200
 
            sock=createSocket(connect=self._socketaddr)
201
 
            sock.send("!!!!!!!!!!!!!!!!!!!!!")
202
 
            sock.close()
203
 
        except socket.error:
204
 
            pass
205