~crunch.io/ubuntu/precise/pymongo/unstable

« back to all changes in this revision

Viewing changes to pymongo/pool.py

  • Committer: Joseph Tate
  • Date: 2013-01-31 08:00:57 UTC
  • mfrom: (1.1.12)
  • Revision ID: jtate@dragonstrider.com-20130131080057-y7lv17xi6x8c1j5x
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
import threading
20
20
import weakref
21
21
 
22
 
from pymongo.errors import ConnectionFailure
 
22
from pymongo import thread_util
 
23
from pymongo.errors import ConnectionFailure, ConfigurationError
23
24
 
24
25
 
25
26
have_ssl = True
29
30
    have_ssl = False
30
31
 
31
32
 
32
 
# PyMongo does not use greenlet-aware connection pools by default, but it will
33
 
# attempt to do so if you pass use_greenlets=True to Connection or
34
 
# ReplicaSetConnection
35
 
have_greenlet = True
36
 
try:
37
 
    import greenlet
38
 
except ImportError:
39
 
    have_greenlet = False
40
 
 
41
 
 
42
33
NO_REQUEST    = None
43
34
NO_SOCKET_YET = -1
44
35
 
63
54
class SocketInfo(object):
64
55
    """Store a socket with some metadata
65
56
    """
66
 
    def __init__(self, sock, poolref):
 
57
    def __init__(self, sock, pool_id):
67
58
        self.sock = sock
68
 
 
69
 
        # We can't strongly reference the Pool, because the Pool
70
 
        # references this SocketInfo as long as it's in pool
71
 
        self.poolref = poolref
72
 
 
73
59
        self.authset = set()
74
60
        self.closed = False
75
61
        self.last_checkout = time.time()
76
 
        self.pool_id = poolref().pool_id
 
62
 
 
63
        # The pool's pool_id changes with each reset() so we can close sockets
 
64
        # created before the last reset.
 
65
        self.pool_id = pool_id
77
66
 
78
67
    def close(self):
79
68
        self.closed = True
83
72
        except:
84
73
            pass
85
74
 
86
 
    def __del__(self):
87
 
        if not self.closed:
88
 
            # This socket was given out, but not explicitly returned. Perhaps
89
 
            # the socket was assigned to a thread local for a request, but the
90
 
            # request wasn't ended before the thread died. Reclaim the socket
91
 
            # for the pool.
92
 
            pool = self.poolref()
93
 
            if pool:
94
 
                # Return a copy of self rather than self -- the Python docs
95
 
                # discourage postponing deletion by adding a reference to self.
96
 
                copy = SocketInfo(self.sock, self.poolref)
97
 
                copy.authset = self.authset
98
 
                pool.return_socket(copy)
99
 
            else:
100
 
                # Close socket now rather than awaiting garbage collector
101
 
                self.close()
102
 
 
103
75
    def __eq__(self, other):
 
76
        # Need to check if other is NO_REQUEST or NO_SOCKET_YET, and then check
 
77
        # if its sock is the same as ours
104
78
        return hasattr(other, 'sock') and self.sock == other.sock
105
79
 
 
80
    def __ne__(self, other):
 
81
        return not self == other
 
82
 
106
83
    def __hash__(self):
107
84
        return hash(self.sock)
108
85
 
109
86
    def __repr__(self):
110
 
        return "SocketInfo(%s, %s)%s at %s" % (
111
 
            repr(self.sock), repr(self.poolref()),
 
87
        return "SocketInfo(%s)%s at %s" % (
 
88
            repr(self.sock),
112
89
            self.closed and " CLOSED" or "",
113
90
            id(self)
114
91
        )
115
92
 
116
93
 
117
 
class BasePool(object):
118
 
    def __init__(self, pair, max_size, net_timeout, conn_timeout, use_ssl):
 
94
# Do *not* explicitly inherit from object or Jython won't call __del__
 
95
# http://bugs.jython.org/issue1057
 
96
class Pool:
 
97
    def __init__(self, pair, max_size, net_timeout, conn_timeout, use_ssl,
 
98
                 use_greenlets):
119
99
        """
120
100
        :Parameters:
121
101
          - `pair`: a (hostname, port) tuple
123
103
          - `net_timeout`: timeout in seconds for operations on open connection
124
104
          - `conn_timeout`: timeout in seconds for establishing connection
125
105
          - `use_ssl`: bool, if True use an encrypted connection
 
106
          - `use_greenlets`: bool, if True then start_request() assigns a
 
107
              socket to the current greenlet - otherwise it is assigned to the
 
108
              current thread
126
109
        """
 
110
        if use_greenlets and not thread_util.have_greenlet:
 
111
            raise ConfigurationError(
 
112
                "The greenlet module is not available. "
 
113
                "Install the greenlet package from PyPI."
 
114
            )
 
115
 
127
116
        self.sockets = set()
128
117
        self.lock = threading.Lock()
129
118
 
136
125
        self.net_timeout = net_timeout
137
126
        self.conn_timeout = conn_timeout
138
127
        self.use_ssl = use_ssl
 
128
        self._ident = thread_util.create_ident(use_greenlets)
 
129
 
 
130
        # Map self._ident.get() -> request socket
 
131
        self._tid_to_sock = {}
 
132
 
 
133
        # Count the number of calls to start_request() per thread or greenlet
 
134
        self._request_counter = thread_util.Counter(use_greenlets)
139
135
 
140
136
    def reset(self):
141
137
        # Ignore this race condition -- if many threads are resetting at once,
142
138
        # the pool_id will definitely change, which is all we care about.
143
139
        self.pool_id += 1
144
 
 
145
 
        request_state = self._get_request_state()
146
140
        self.pid = os.getpid()
147
141
 
148
 
        # Close sockets before deleting them, otherwise they'll come
149
 
        # running back.
150
 
        if request_state not in (NO_REQUEST, NO_SOCKET_YET):
151
 
            # request_state is a SocketInfo for this request
152
 
            request_state.close()
153
 
 
154
142
        sockets = None
155
143
        try:
156
144
            # Swapping variables is not atomic. We need to ensure no other
163
151
 
164
152
        for sock_info in sockets: sock_info.close()
165
153
 
166
 
        # Reset subclass's data structures
167
 
        self._reset()
168
 
 
169
 
        # If we were in a request before the reset, then delete the request
170
 
        # socket, but resume the request with a new socket the next time
171
 
        # get_socket() is called.
172
 
        if request_state != NO_REQUEST:
173
 
            self._set_request_state(NO_SOCKET_YET)
174
 
 
175
154
    def create_connection(self, pair):
176
155
        """Connect to *pair* and return the socket object.
177
156
 
178
157
        This is a modified version of create_connection from
179
158
        CPython >=2.6.
180
159
        """
181
 
        # Don't try IPv6 if we don't support it.
 
160
        host, port = pair or self.pair
 
161
 
 
162
        # Check if dealing with a unix domain socket
 
163
        if host.endswith('.sock'):
 
164
            if not hasattr(socket, "AF_UNIX"):
 
165
                raise ConnectionFailure("UNIX-sockets are not supported "
 
166
                                        "on this system")
 
167
            sock = socket.socket(socket.AF_UNIX)
 
168
            try:
 
169
                sock.connect(host)
 
170
                return sock
 
171
            except socket.error, e:
 
172
                if sock is not None:
 
173
                    sock.close()
 
174
                raise e
 
175
 
 
176
        # Don't try IPv6 if we don't support it. Also skip it if host
 
177
        # is 'localhost' (::1 is fine). Avoids slow connect issues
 
178
        # like PYTHON-356.
182
179
        family = socket.AF_INET
183
 
        if socket.has_ipv6:
 
180
        if socket.has_ipv6 and host != 'localhost':
184
181
            family = socket.AF_UNSPEC
185
182
 
186
 
        host, port = pair or self.pair
187
183
        err = None
188
184
        for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
189
185
            af, socktype, proto, dummy, sa = res
224
220
                                        "not be configured with SSL support.")
225
221
 
226
222
        sock.settimeout(self.net_timeout)
227
 
        return SocketInfo(sock, weakref.ref(self))
 
223
        return SocketInfo(sock, self.pool_id)
228
224
 
229
225
    def get_socket(self, pair=None):
230
226
        """Get a socket from the pool.
257
253
        sock_info, from_pool = None, None
258
254
        try:
259
255
            try:
260
 
                # set.pop() isn't atomic in Jython, see
 
256
                # set.pop() isn't atomic in Jython less than 2.7, see
261
257
                # http://bugs.jython.org/issue1854
262
258
                self.lock.acquire()
263
259
                sock_info, from_pool = self.sockets.pop(), True
284
280
            # have no socket assigned to the request yet.
285
281
            self._set_request_state(NO_SOCKET_YET)
286
282
 
 
283
        self._request_counter.inc()
 
284
 
287
285
    def in_request(self):
288
 
        return self._get_request_state() != NO_REQUEST
 
286
        return bool(self._request_counter.get())
289
287
 
290
288
    def end_request(self):
291
 
        sock_info = self._get_request_state()
292
 
        self._set_request_state(NO_REQUEST)
293
 
        self.return_socket(sock_info)
 
289
        tid = self._ident.get()
 
290
 
 
291
        # Check if start_request has ever been called in this thread / greenlet
 
292
        count = self._request_counter.get()
 
293
        if count:
 
294
            self._request_counter.dec()
 
295
            if count == 1:
 
296
                # End request
 
297
                sock_info = self._get_request_state()
 
298
                self._set_request_state(NO_REQUEST)
 
299
                if sock_info not in (NO_REQUEST, NO_SOCKET_YET):
 
300
                    self._return_socket(sock_info)
294
301
 
295
302
    def discard_socket(self, sock_info):
296
303
        """Close and discard the active socket.
297
304
        """
298
 
        if sock_info:
 
305
        if sock_info not in (NO_REQUEST, NO_SOCKET_YET):
299
306
            sock_info.close()
300
307
 
301
308
            if sock_info == self._get_request_state():
 
309
                # Discarding request socket; prepare to use a new request
 
310
                # socket on next get_socket().
302
311
                self._set_request_state(NO_SOCKET_YET)
303
312
 
304
 
    def return_socket(self, sock_info):
305
 
        """Return the socket currently in use to the pool. If the
306
 
        pool is full the socket will be discarded.
 
313
    def maybe_return_socket(self, sock_info):
 
314
        """Return the socket to the pool unless it's the request socket.
307
315
        """
308
316
        if self.pid != os.getpid():
309
317
            self.reset()
312
320
                return
313
321
 
314
322
            if sock_info != self._get_request_state():
315
 
                added = False
316
 
                try:
317
 
                    self.lock.acquire()
318
 
                    if len(self.sockets) < self.max_size:
319
 
                        self.sockets.add(sock_info)
320
 
                        added = True
321
 
                finally:
322
 
                    self.lock.release()
 
323
                self._return_socket(sock_info)
323
324
 
324
 
                if not added:
325
 
                    self.discard_socket(sock_info)
 
325
    def _return_socket(self, sock_info):
 
326
        """Return socket to the pool. If pool is full the socket is discarded.
 
327
        """
 
328
        try:
 
329
            self.lock.acquire()
 
330
            if len(self.sockets) < self.max_size:
 
331
                self.sockets.add(sock_info)
 
332
            else:
 
333
                sock_info.close()
 
334
        finally:
 
335
            self.lock.release()
326
336
 
327
337
    def _check(self, sock_info, pair):
328
338
        """This side-effecty function checks if this pool has been reset since
329
339
        the last time this socket was used, or if the socket has been closed by
330
 
        some external network error if it's been > 1 second since the last time
331
 
        we used it, and if so, attempts to create a new socket. If this
332
 
        connection attempt fails we reset the pool and reraise the error.
 
340
        some external network error, and if so, attempts to create a new socket.
 
341
        If this connection attempt fails we reset the pool and reraise the
 
342
        error.
333
343
 
334
344
        Checking sockets lets us avoid seeing *some*
335
345
        :class:`~pymongo.errors.AutoReconnect` exceptions on server
339
349
        """
340
350
        error = False
341
351
 
342
 
        if self.pool_id != sock_info.pool_id:
343
 
            self.discard_socket(sock_info)
 
352
        if sock_info.closed:
 
353
            error = True
 
354
 
 
355
        elif self.pool_id != sock_info.pool_id:
 
356
            sock_info.close()
344
357
            error = True
345
358
 
346
359
        elif time.time() - sock_info.last_checkout > 1:
347
360
            if _closed(sock_info.sock):
348
 
                self.discard_socket(sock_info)
 
361
                sock_info.close()
349
362
                error = True
350
363
 
351
364
        if not error:
357
370
                self.reset()
358
371
                raise
359
372
 
360
 
    # Overridable methods for Pools. These methods must simply set and get an
361
 
    # arbitrary value associated with the execution context (thread, greenlet,
362
 
    # Tornado StackContext, ...) in which we want to use a single socket.
363
 
    def _set_request_state(self, sock_info):
364
 
        raise NotImplementedError
365
 
 
366
 
    def _get_request_state(self):
367
 
        raise NotImplementedError
368
 
 
369
 
    def _reset(self):
370
 
        pass
371
 
 
372
 
 
373
 
# This thread-local will hold a Pool's per-thread request state. sock_info
374
 
# defaults to NO_REQUEST each time it's accessed from a new thread. It's
375
 
# much simpler to make a separate thread-local class rather than having Pool
376
 
# inherit both from BasePool and threading.local.
377
 
class _Local(threading.local):
378
 
    sock_info = NO_REQUEST
379
 
 
380
 
 
381
 
class Pool(BasePool):
382
 
    """A simple connection pool.
383
 
 
384
 
    Calling start_request() acquires a thread-local socket, which is returned
385
 
    to the pool when the thread calls end_request() or dies.
386
 
    """
387
 
    def __init__(self, *args, **kwargs):
388
 
        self.local = _Local()
389
 
        super(Pool, self).__init__(*args, **kwargs)
390
 
 
391
 
    def _set_request_state(self, sock_info):
392
 
        self.local.sock_info = sock_info
393
 
 
394
 
    def _get_request_state(self):
395
 
        return self.local.sock_info
396
 
 
397
 
    def _reset(self):
398
 
        self.local.sock_info = NO_REQUEST
399
 
 
400
 
 
401
 
class GreenletPool(BasePool):
402
 
    """A simple connection pool.
403
 
 
404
 
    Calling start_request() acquires a greenlet-local socket, which is returned
405
 
    to the pool when the greenlet calls end_request() or dies.
406
 
    """
407
 
    def __init__(self, *args, **kwargs):
408
 
        self._gr_id_to_sock = {}
409
 
 
410
 
        # Weakrefs to non-Gevent greenlets
411
 
        self._refs = {}
412
 
        super(GreenletPool, self).__init__(*args, **kwargs)
413
 
 
414
 
    # Overrides
415
 
    def _set_request_state(self, sock_info):
416
 
        current = greenlet.getcurrent()
417
 
        gr_id = id(current)
 
373
    def _set_request_state(self, sock_info):
 
374
        tid = self._ident.get()
418
375
 
419
376
        if sock_info == NO_REQUEST:
420
 
            self._refs.pop(gr_id, None)
421
 
            self._gr_id_to_sock.pop(gr_id, None)
 
377
            # Ending a request
 
378
            self._ident.unwatch()
 
379
            self._tid_to_sock.pop(tid, None)
422
380
        else:
423
 
            self._gr_id_to_sock[gr_id] = sock_info
424
 
 
425
 
            def delete_callback(dummy):
426
 
                # End the request
427
 
                self._refs.pop(gr_id, None)
428
 
                request_sock = self._gr_id_to_sock.pop(gr_id, None)
429
 
                self.return_socket(request_sock)
430
 
 
431
 
            if gr_id not in self._refs:
432
 
                if hasattr(current, 'link'):
433
 
                    # This is a Gevent Greenlet (capital G), which inherits from
434
 
                    # greenlet and provides a 'link' method to detect when the
435
 
                    # Greenlet exits
436
 
                    current.link(delete_callback)
437
 
                    self._refs[gr_id] = None
438
 
                else:
439
 
                    # This is a non-Gevent greenlet (small g), or it's the main
440
 
                    # greenlet. Since there's no link() method, we use a weakref
441
 
                    # to detect when the greenlet is garbage-collected. Garbage-
442
 
                    # collection is a later-firing and less reliable event than
443
 
                    # Greenlet.link() so we prefer link() if available.
444
 
                    self._refs[gr_id] = weakref.ref(current, delete_callback)
 
381
            self._tid_to_sock[tid] = sock_info
 
382
 
 
383
            if not self._ident.watching():
 
384
                # Closure over tid and poolref. Don't refer directly to self,
 
385
                # otherwise there's a cycle.
 
386
 
 
387
                # Do not access threadlocals in this function, or any
 
388
                # function it calls! In the case of the Pool subclass and
 
389
                # mod_wsgi 2.x, on_thread_died() is triggered when mod_wsgi
 
390
                # calls PyThreadState_Clear(), which deferences the
 
391
                # ThreadVigil and triggers the weakref callback. Accessing
 
392
                # thread locals in this function, while PyThreadState_Clear()
 
393
                # is in progress can cause leaks, see PYTHON-353.
 
394
                poolref = weakref.ref(self)
 
395
                def on_thread_died(ref):
 
396
                    try:
 
397
                        pool = poolref()
 
398
                        if pool:
 
399
                            # End the request
 
400
                            request_sock = pool._tid_to_sock.pop(tid, None)
 
401
 
 
402
                            # Was thread ever assigned a socket before it died?
 
403
                            if request_sock not in (NO_REQUEST, NO_SOCKET_YET):
 
404
                                pool._return_socket(request_sock)
 
405
                    except:
 
406
                        # Random exceptions on interpreter shutdown.
 
407
                        pass
 
408
 
 
409
                self._ident.watch(on_thread_died)
445
410
 
446
411
    def _get_request_state(self):
447
 
        gr_id = id(greenlet.getcurrent())
448
 
        return self._gr_id_to_sock.get(gr_id, NO_REQUEST)
449
 
 
450
 
    def _reset(self):
451
 
        self._gr_id_to_sock.clear()
452
 
        self._refs.clear()
 
412
        tid = self._ident.get()
 
413
        return self._tid_to_sock.get(tid, NO_REQUEST)
 
414
 
 
415
    def __del__(self):
 
416
        # Avoid ResourceWarnings in Python 3
 
417
        for sock_info in self.sockets:
 
418
            sock_info.close()
 
419
 
 
420
        for request_sock in self._tid_to_sock.values():
 
421
            if request_sock not in (NO_REQUEST, NO_SOCKET_YET):
 
422
                request_sock.close()
453
423
 
454
424
 
455
425
class Request(object):