~ubuntu-branches/ubuntu/trusty/python-happybase/trusty

« back to all changes in this revision

Viewing changes to happybase/pool.py

  • Committer: Package Import Robot
  • Author(s): Thomas Goirand
  • Date: 2013-05-30 13:56:42 UTC
  • mfrom: (1.1.1)
  • Revision ID: package-import@ubuntu.com-20130530135642-tveld2y1dbkhmuv3
Tags: 0.6-1
* New upstream release (Closes: #712971).
* Ran wrap-and-sort.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""
 
2
HappyBase connection pool module.
 
3
"""
 
4
 
 
5
import contextlib
 
6
import logging
 
7
import Queue
 
8
import socket
 
9
import threading
 
10
 
 
11
from thrift.Thrift import TException
 
12
 
 
13
from .connection import Connection
 
14
 
 
15
logger = logging.getLogger(__name__)
 
16
 
 
17
#
 
18
# TODO: maybe support multiple Thrift servers. What would a reasonable
 
19
# distribution look like? Round-robin? Randomize the list upon
 
20
# instantiation and then cycle through it? How to handle (temporary?)
 
21
# connection errors?
 
22
#
 
23
 
 
24
 
 
25
class NoConnectionsAvailable(RuntimeError):
 
26
    """
 
27
    Exception raised when no connections are available.
 
28
 
 
29
    This happens if a timeout was specified when obtaining a connection,
 
30
    and no connection became available within the specified timeout.
 
31
 
 
32
    .. versionadded:: 0.5
 
33
    """
 
34
    pass
 
35
 
 
36
 
 
37
class ConnectionPool(object):
 
38
    """
 
39
    Thread-safe connection pool.
 
40
 
 
41
    .. versionadded:: 0.5
 
42
 
 
43
    The `size` parameter specifies how many connections this pool
 
44
    manages. Additional keyword arguments are passed unmodified to the
 
45
    :py:class:`happybase.Connection` constructor, with the exception of
 
46
    the `autoconnect` argument, since maintaining connections is the
 
47
    task of the pool.
 
48
 
 
49
    :param int size: the maximum number of concurrently open connections
 
50
    :param kwargs: keyword arguments passed to
 
51
                   :py:class:`happybase.Connection`
 
52
    """
 
53
    def __init__(self, size, **kwargs):
 
54
        if not isinstance(size, int):
 
55
            raise TypeError("Pool 'size' arg must be an integer")
 
56
 
 
57
        if not size > 0:
 
58
            raise ValueError("Pool 'size' arg must be greater than zero")
 
59
 
 
60
        logger.debug(
 
61
            "Initializing connection pool with %d connections", size)
 
62
 
 
63
        self._lock = threading.Lock()
 
64
        self._queue = Queue.LifoQueue(maxsize=size)
 
65
        self._thread_connections = threading.local()
 
66
 
 
67
        connection_kwargs = kwargs
 
68
        connection_kwargs['autoconnect'] = False
 
69
 
 
70
        for i in xrange(size):
 
71
            connection = Connection(**connection_kwargs)
 
72
            self._queue.put(connection)
 
73
 
 
74
        # The first connection is made immediately so that trivial
 
75
        # mistakes like unresolvable host names are raised immediately.
 
76
        # Subsequent connections are connected lazily.
 
77
        with self.connection():
 
78
            pass
 
79
 
 
80
    def _acquire_connection(self, timeout=None):
 
81
        """Acquire a connection from the pool."""
 
82
        try:
 
83
            return self._queue.get(True, timeout)
 
84
        except Queue.Empty:
 
85
            raise NoConnectionsAvailable(
 
86
                "No connection available from pool within specified "
 
87
                "timeout")
 
88
 
 
89
    def _return_connection(self, connection):
 
90
        """Return a connection to the pool."""
 
91
        self._queue.put(connection)
 
92
 
 
93
    @contextlib.contextmanager
 
94
    def connection(self, timeout=None):
 
95
        """
 
96
        Obtain a connection from the pool.
 
97
 
 
98
        This method *must* be used as a context manager, i.e. with
 
99
        Python's ``with`` block. Example::
 
100
 
 
101
            with pool.connection() as connection:
 
102
                pass  # do something with the connection
 
103
 
 
104
        If `timeout` is specified, this is the number of seconds to wait
 
105
        for a connection to become available before
 
106
        :py:exc:`NoConnectionsAvailable` is raised. If omitted, this
 
107
        method waits forever for a connection to become available.
 
108
 
 
109
        :param int timeout: number of seconds to wait (optional)
 
110
        :return: active connection from the pool
 
111
        :rtype: :py:class:`happybase.Connection`
 
112
        """
 
113
 
 
114
        connection = getattr(self._thread_connections, 'current', None)
 
115
 
 
116
        return_after_use = False
 
117
        if connection is None:
 
118
            # This is the outermost connection requests for this thread.
 
119
            # Obtain a new connection from the pool and keep a reference
 
120
            # in a thread local so that nested connection requests from
 
121
            # the same thread can return the same connection instance.
 
122
            #
 
123
            # Note: this code acquires a lock before assigning to the
 
124
            # thread local; see
 
125
            # http://emptysquare.net/blog/another-thing-about-pythons-
 
126
            # threadlocals/
 
127
            return_after_use = True
 
128
            connection = self._acquire_connection(timeout)
 
129
            with self._lock:
 
130
                self._thread_connections.current = connection
 
131
 
 
132
        try:
 
133
            # Open connection, because connections are opened lazily.
 
134
            # This is a no-op for connections that are already open.
 
135
            connection.open()
 
136
 
 
137
            # Return value from the context manager's __enter__()
 
138
            yield connection
 
139
 
 
140
        except (TException, socket.error):
 
141
            # Refresh the underlying Thrift client if an exception
 
142
            # occurred in the Thrift layer, since we don't know whether
 
143
            # the connection is still usable.
 
144
            logger.info("Replacing tainted pool connection")
 
145
            connection._refresh_thrift_client()
 
146
            connection.open()
 
147
 
 
148
            # Reraise to caller; see contextlib.contextmanager() docs
 
149
            raise
 
150
 
 
151
        finally:
 
152
            # Remove thread local reference after the outermost 'with'
 
153
            # block ends. Afterwards the thread no longer owns the
 
154
            # connection.
 
155
            if return_after_use:
 
156
                del self._thread_connections.current
 
157
                self._return_connection(connection)