88
# This socket was given out, but not explicitly returned. Perhaps
89
# the socket was assigned to a thread local for a request, but the
90
# request wasn't ended before the thread died. Reclaim the socket
94
# Return a copy of self rather than self -- the Python docs
95
# discourage postponing deletion by adding a reference to self.
96
copy = SocketInfo(self.sock, self.poolref)
97
copy.authset = self.authset
98
pool.return_socket(copy)
100
# Close socket now rather than awaiting garbage collector
103
75
def __eq__(self, other):
76
# Need to check if other is NO_REQUEST or NO_SOCKET_YET, and then check
77
# if its sock is the same as ours
104
78
return hasattr(other, 'sock') and self.sock == other.sock
80
def __ne__(self, other):
81
return not self == other
106
83
def __hash__(self):
107
84
return hash(self.sock)
109
86
def __repr__(self):
110
return "SocketInfo(%s, %s)%s at %s" % (
111
repr(self.sock), repr(self.poolref()),
87
return "SocketInfo(%s)%s at %s" % (
112
89
self.closed and " CLOSED" or "",
117
class BasePool(object):
118
def __init__(self, pair, max_size, net_timeout, conn_timeout, use_ssl):
94
# Do *not* explicitly inherit from object or Jython won't call __del__
95
# http://bugs.jython.org/issue1057
97
def __init__(self, pair, max_size, net_timeout, conn_timeout, use_ssl,
121
101
- `pair`: a (hostname, port) tuple
164
152
for sock_info in sockets: sock_info.close()
166
# Reset subclass's data structures
169
# If we were in a request before the reset, then delete the request
170
# socket, but resume the request with a new socket the next time
171
# get_socket() is called.
172
if request_state != NO_REQUEST:
173
self._set_request_state(NO_SOCKET_YET)
175
154
def create_connection(self, pair):
176
155
"""Connect to *pair* and return the socket object.
178
157
This is a modified version of create_connection from
181
# Don't try IPv6 if we don't support it.
160
host, port = pair or self.pair
162
# Check if dealing with a unix domain socket
163
if host.endswith('.sock'):
164
if not hasattr(socket, "AF_UNIX"):
165
raise ConnectionFailure("UNIX-sockets are not supported "
167
sock = socket.socket(socket.AF_UNIX)
171
except socket.error, e:
176
# Don't try IPv6 if we don't support it. Also skip it if host
177
# is 'localhost' (::1 is fine). Avoids slow connect issues
182
179
family = socket.AF_INET
180
if socket.has_ipv6 and host != 'localhost':
184
181
family = socket.AF_UNSPEC
186
host, port = pair or self.pair
188
184
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
189
185
af, socktype, proto, dummy, sa = res
284
280
# have no socket assigned to the request yet.
285
281
self._set_request_state(NO_SOCKET_YET)
283
self._request_counter.inc()
287
285
def in_request(self):
288
return self._get_request_state() != NO_REQUEST
286
return bool(self._request_counter.get())
290
288
def end_request(self):
291
sock_info = self._get_request_state()
292
self._set_request_state(NO_REQUEST)
293
self.return_socket(sock_info)
289
tid = self._ident.get()
291
# Check if start_request has ever been called in this thread / greenlet
292
count = self._request_counter.get()
294
self._request_counter.dec()
297
sock_info = self._get_request_state()
298
self._set_request_state(NO_REQUEST)
299
if sock_info not in (NO_REQUEST, NO_SOCKET_YET):
300
self._return_socket(sock_info)
295
302
def discard_socket(self, sock_info):
296
303
"""Close and discard the active socket.
305
if sock_info not in (NO_REQUEST, NO_SOCKET_YET):
299
306
sock_info.close()
301
308
if sock_info == self._get_request_state():
309
# Discarding request socket; prepare to use a new request
310
# socket on next get_socket().
302
311
self._set_request_state(NO_SOCKET_YET)
304
def return_socket(self, sock_info):
305
"""Return the socket currently in use to the pool. If the
306
pool is full the socket will be discarded.
313
def maybe_return_socket(self, sock_info):
314
"""Return the socket to the pool unless it's the request socket.
308
316
if self.pid != os.getpid():
314
322
if sock_info != self._get_request_state():
318
if len(self.sockets) < self.max_size:
319
self.sockets.add(sock_info)
323
self._return_socket(sock_info)
325
self.discard_socket(sock_info)
325
def _return_socket(self, sock_info):
326
"""Return socket to the pool. If pool is full the socket is discarded.
330
if len(self.sockets) < self.max_size:
331
self.sockets.add(sock_info)
327
337
def _check(self, sock_info, pair):
328
338
"""This side-effecty function checks if this pool has been reset since
329
339
the last time this socket was used, or if the socket has been closed by
330
some external network error if it's been > 1 second since the last time
331
we used it, and if so, attempts to create a new socket. If this
332
connection attempt fails we reset the pool and reraise the error.
340
some external network error, and if so, attempts to create a new socket.
341
If this connection attempt fails we reset the pool and reraise the
334
344
Checking sockets lets us avoid seeing *some*
335
345
:class:`~pymongo.errors.AutoReconnect` exceptions on server
360
# Overridable methods for Pools. These methods must simply set and get an
361
# arbitrary value associated with the execution context (thread, greenlet,
362
# Tornado StackContext, ...) in which we want to use a single socket.
363
def _set_request_state(self, sock_info):
364
raise NotImplementedError
366
def _get_request_state(self):
367
raise NotImplementedError
373
# This thread-local will hold a Pool's per-thread request state. sock_info
374
# defaults to NO_REQUEST each time it's accessed from a new thread. It's
375
# much simpler to make a separate thread-local class rather than having Pool
376
# inherit both from BasePool and threading.local.
377
class _Local(threading.local):
378
sock_info = NO_REQUEST
381
class Pool(BasePool):
382
"""A simple connection pool.
384
Calling start_request() acquires a thread-local socket, which is returned
385
to the pool when the thread calls end_request() or dies.
387
def __init__(self, *args, **kwargs):
388
self.local = _Local()
389
super(Pool, self).__init__(*args, **kwargs)
391
def _set_request_state(self, sock_info):
392
self.local.sock_info = sock_info
394
def _get_request_state(self):
395
return self.local.sock_info
398
self.local.sock_info = NO_REQUEST
401
class GreenletPool(BasePool):
402
"""A simple connection pool.
404
Calling start_request() acquires a greenlet-local socket, which is returned
405
to the pool when the greenlet calls end_request() or dies.
407
def __init__(self, *args, **kwargs):
408
self._gr_id_to_sock = {}
410
# Weakrefs to non-Gevent greenlets
412
super(GreenletPool, self).__init__(*args, **kwargs)
415
def _set_request_state(self, sock_info):
416
current = greenlet.getcurrent()
373
def _set_request_state(self, sock_info):
374
tid = self._ident.get()
419
376
if sock_info == NO_REQUEST:
420
self._refs.pop(gr_id, None)
421
self._gr_id_to_sock.pop(gr_id, None)
378
self._ident.unwatch()
379
self._tid_to_sock.pop(tid, None)
423
self._gr_id_to_sock[gr_id] = sock_info
425
def delete_callback(dummy):
427
self._refs.pop(gr_id, None)
428
request_sock = self._gr_id_to_sock.pop(gr_id, None)
429
self.return_socket(request_sock)
431
if gr_id not in self._refs:
432
if hasattr(current, 'link'):
433
# This is a Gevent Greenlet (capital G), which inherits from
434
# greenlet and provides a 'link' method to detect when the
436
current.link(delete_callback)
437
self._refs[gr_id] = None
439
# This is a non-Gevent greenlet (small g), or it's the main
440
# greenlet. Since there's no link() method, we use a weakref
441
# to detect when the greenlet is garbage-collected. Garbage-
442
# collection is a later-firing and less reliable event than
443
# Greenlet.link() so we prefer link() if available.
444
self._refs[gr_id] = weakref.ref(current, delete_callback)
381
self._tid_to_sock[tid] = sock_info
383
if not self._ident.watching():
384
# Closure over tid and poolref. Don't refer directly to self,
385
# otherwise there's a cycle.
387
# Do not access threadlocals in this function, or any
388
# function it calls! In the case of the Pool subclass and
389
# mod_wsgi 2.x, on_thread_died() is triggered when mod_wsgi
390
# calls PyThreadState_Clear(), which deferences the
391
# ThreadVigil and triggers the weakref callback. Accessing
392
# thread locals in this function, while PyThreadState_Clear()
393
# is in progress can cause leaks, see PYTHON-353.
394
poolref = weakref.ref(self)
395
def on_thread_died(ref):
400
request_sock = pool._tid_to_sock.pop(tid, None)
402
# Was thread ever assigned a socket before it died?
403
if request_sock not in (NO_REQUEST, NO_SOCKET_YET):
404
pool._return_socket(request_sock)
406
# Random exceptions on interpreter shutdown.
409
self._ident.watch(on_thread_died)
446
411
def _get_request_state(self):
447
gr_id = id(greenlet.getcurrent())
448
return self._gr_id_to_sock.get(gr_id, NO_REQUEST)
451
self._gr_id_to_sock.clear()
412
tid = self._ident.get()
413
return self._tid_to_sock.get(tid, NO_REQUEST)
416
# Avoid ResourceWarnings in Python 3
417
for sock_info in self.sockets:
420
for request_sock in self._tid_to_sock.values():
421
if request_sock not in (NO_REQUEST, NO_SOCKET_YET):
455
425
class Request(object):