2
Socket server based on a worker thread pool. Doesn't use select.
4
Uses a single worker thread per client connection.
6
Pyro - Python Remote Objects. Copyright by Irmen de Jong.
7
irmen@razorvine.net - http://www.razorvine.net/python/Pyro
10
from __future__ import with_statement
11
import socket, logging, Queue
13
from Pyro.socketutil import SocketConnection, createSocket
14
from Pyro.errors import ConnectionClosedError, PyroError
16
from Pyro import threadutil
18
log=logging.getLogger("Pyro.socketserver.threadpool")
20
class SocketWorker(threadutil.Thread):
21
"""worker thread to process requests"""
22
def __init__(self, server, callback):
23
super(SocketWorker,self).__init__()
26
self.callback=callback
28
# jython names every thread 'Thread', so we improve that a little
29
self.setName("Thread-%d"%id(self))
33
log.debug("new worker %s", self.getName())
34
while self.running: # loop over all connections in the queue
35
self.csock,self.caddr = self.server.workqueue.get()
36
if self.csock is None and self.caddr is None:
37
# this was a 'stop' sentinel
40
log.debug("worker %s got a client connection %s", self.getName(), self.caddr)
41
self.csock=SocketConnection(self.csock)
42
if self.handleConnection(self.csock):
43
self.server.threadpool.updateWorking(1) # tell the pool we're working
45
while self.running: # loop over all requests during a single connection
47
self.callback.handleRequest(self.csock)
48
except (socket.error,ConnectionClosedError):
50
log.debug("worker %s client disconnected %s", self.getName(), self.caddr)
54
# make sure we tell the pool that we are no longer working
55
self.server.threadpool.updateWorking(-1)
56
# Note: we don't swallow exceptions here anymore because @Pyro.callback doesn't
57
# do anything anymore if we do (the re-raised exception would be swallowed...)
59
# exc_type, exc_value, _ = sys.exc_info()
60
# log.warn("swallow exception in worker %s: %s %s",self.getName(),exc_type,exc_value)
62
self.server.threadpool.remove(self)
63
log.debug("stopping worker %s", self.getName())
64
def handleConnection(self,conn):
66
if self.callback.handshake(conn):
68
except (socket.error, PyroError), x:
69
log.warn("error during connect: %s",x)
73
class ThreadPool(set):
74
lock=threadutil.Lock()
75
def __init__(self, server, callback):
77
self.__callback=callback
79
self.__lastshrink=time.time()
80
def attemptRemove(self, member):
82
if len(self)>Pyro.config.THREADPOOL_MINTHREADS:
83
super(ThreadPool,self).remove(member)
86
def remove(self, member):
89
super(ThreadPool,self).remove(member)
92
def attemptSpawn(self):
94
if len(self)<Pyro.config.THREADPOOL_MAXTHREADS:
95
worker=SocketWorker(self.__server, self.__callback)
100
def poolCritical(self):
101
idle=len(self)-self.__working
103
def updateWorking(self, number):
106
self.__working+=number
109
if threads>Pyro.config.THREADPOOL_MINTHREADS:
110
idle=threads-self.__working
111
if idle>Pyro.config.THREADPOOL_MINTHREADS and (time.time()-self.__lastshrink)>Pyro.config.THREADPOOL_IDLETIMEOUT:
112
for _ in range(idle-Pyro.config.THREADPOOL_MINTHREADS):
113
self.__server.workqueue.put((None,None)) # put a 'stop' sentinel in the worker queue to kill a worker
114
self.__lastshrink=time.time()
116
class SocketServer_Threadpool(object):
117
"""transport server for socket connections, worker thread pool version."""
118
def __init__(self, callbackObject, host, port, timeout=None):
119
log.info("starting thread pool socketserver")
121
self.sock=createSocket(bind=(host,port), timeout=timeout)
122
self._socketaddr=self.sock.getsockname()
123
if self._socketaddr[0].startswith("127."):
124
if host is None or host.lower()!="localhost" and not host.startswith("127."):
125
log.warn("weird DNS setup: %s resolves to localhost (127.x.x.x)",host)
126
host=host or self._socketaddr[0]
127
port=port or self._socketaddr[1]
128
self.locationStr="%s:%d" % (host,port)
129
self.threadpool=ThreadPool(self, callbackObject)
130
self.workqueue=Queue.Queue()
131
for _ in range(Pyro.config.THREADPOOL_MINTHREADS):
132
self.threadpool.attemptSpawn()
133
log.info("%d worker threads started", len(self.threadpool))
135
if self.sock is not None:
138
def requestLoop(self, loopCondition=lambda:True):
139
log.debug("threadpool server requestloop")
140
while (self.sock is not None) and loopCondition():
142
self.handleRequests(None)
144
if not loopCondition():
145
# swallow the socket error if loop terminates anyway
146
# this can occur if we are asked to shutdown, socket can be invalid then
150
except KeyboardInterrupt:
151
log.debug("stopping on break signal")
153
log.debug("threadpool server exits requestloop")
154
def handleRequests(self, eventsockets):
156
# we only react on events on our own server socket.
157
# all other (client) sockets are owned by their individual threads.
158
csock, caddr=self.sock.accept()
159
log.debug("connection from %s",caddr)
160
if Pyro.config.COMMTIMEOUT:
161
csock.settimeout(Pyro.config.COMMTIMEOUT)
162
if self.threadpool.poolCritical():
163
self.threadpool.attemptSpawn()
164
self.workqueue.put((csock,caddr))
165
except socket.timeout:
166
pass # just continue the loop on a timeout on accept
168
def close(self, joinWorkers=True):
169
log.debug("closing threadpool server")
176
for worker in self.threadpool.copy():
178
csock=getattr(worker,"csock",None)
180
csock.close() # terminate socket that the worker might be listening on
181
if self.workqueue is not None:
182
self.workqueue.put((None,None)) # put a 'stop' sentinel in the worker queue
185
worker=self.threadpool.pop()
192
return self.sock.fileno()
194
# the server socket is all we care about, all client sockets are running in their own threads
197
def pingConnection(self):
198
"""bit of a hack to trigger a blocking server to get out of the loop, useful at clean shutdowns"""
200
sock=createSocket(connect=self._socketaddr)
201
sock.send("!!!!!!!!!!!!!!!!!!!!!")