2
HappyBase connection pool module.
11
from thrift.Thrift import TException
13
from .connection import Connection
15
logger = logging.getLogger(__name__)
18
# TODO: maybe support multiple Thrift servers. What would a reasonable
19
# distribution look like? Round-robin? Randomize the list upon
20
# instantiation and then cycle through it? How to handle (temporary?)
25
class NoConnectionsAvailable(RuntimeError):
27
Exception raised when no connections are available.
29
This happens if a timeout was specified when obtaining a connection,
30
and no connection became available within the specified timeout.
37
class ConnectionPool(object):
39
Thread-safe connection pool.
43
The `size` parameter specifies how many connections this pool
44
manages. Additional keyword arguments are passed unmodified to the
45
:py:class:`happybase.Connection` constructor, with the exception of
46
the `autoconnect` argument, since maintaining connections is the
49
:param int size: the maximum number of concurrently open connections
50
:param kwargs: keyword arguments passed to
51
:py:class:`happybase.Connection`
53
def __init__(self, size, **kwargs):
54
if not isinstance(size, int):
55
raise TypeError("Pool 'size' arg must be an integer")
58
raise ValueError("Pool 'size' arg must be greater than zero")
61
"Initializing connection pool with %d connections", size)
63
self._lock = threading.Lock()
64
self._queue = Queue.LifoQueue(maxsize=size)
65
self._thread_connections = threading.local()
67
connection_kwargs = kwargs
68
connection_kwargs['autoconnect'] = False
70
for i in xrange(size):
71
connection = Connection(**connection_kwargs)
72
self._queue.put(connection)
74
# The first connection is made immediately so that trivial
75
# mistakes like unresolvable host names are raised immediately.
76
# Subsequent connections are connected lazily.
77
with self.connection():
80
def _acquire_connection(self, timeout=None):
81
"""Acquire a connection from the pool."""
83
return self._queue.get(True, timeout)
85
raise NoConnectionsAvailable(
86
"No connection available from pool within specified "
89
def _return_connection(self, connection):
90
"""Return a connection to the pool."""
91
self._queue.put(connection)
93
@contextlib.contextmanager
94
def connection(self, timeout=None):
96
Obtain a connection from the pool.
98
This method *must* be used as a context manager, i.e. with
99
Python's ``with`` block. Example::
101
with pool.connection() as connection:
102
pass # do something with the connection
104
If `timeout` is specified, this is the number of seconds to wait
105
for a connection to become available before
106
:py:exc:`NoConnectionsAvailable` is raised. If omitted, this
107
method waits forever for a connection to become available.
109
:param int timeout: number of seconds to wait (optional)
110
:return: active connection from the pool
111
:rtype: :py:class:`happybase.Connection`
114
connection = getattr(self._thread_connections, 'current', None)
116
return_after_use = False
117
if connection is None:
118
# This is the outermost connection requests for this thread.
119
# Obtain a new connection from the pool and keep a reference
120
# in a thread local so that nested connection requests from
121
# the same thread can return the same connection instance.
123
# Note: this code acquires a lock before assigning to the
125
# http://emptysquare.net/blog/another-thing-about-pythons-
127
return_after_use = True
128
connection = self._acquire_connection(timeout)
130
self._thread_connections.current = connection
133
# Open connection, because connections are opened lazily.
134
# This is a no-op for connections that are already open.
137
# Return value from the context manager's __enter__()
140
except (TException, socket.error):
141
# Refresh the underlying Thrift client if an exception
142
# occurred in the Thrift layer, since we don't know whether
143
# the connection is still usable.
144
logger.info("Replacing tainted pool connection")
145
connection._refresh_thrift_client()
148
# Reraise to caller; see contextlib.contextmanager() docs
152
# Remove thread local reference after the outermost 'with'
153
# block ends. Afterwards the thread no longer owns the
156
del self._thread_connections.current
157
self._return_connection(connection)